一、概述
Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。 它支持多种类型的数据结构,如 字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted sets) 与范围查询, bitmaps, hyperloglogs 和 地理空间(geospatial) 索引半径查询。 Redis 内置了 复制(replication),LUA脚本(Lua scripting), LRU驱动事件(LRU eviction),事务(transactions) 和不同级别的 磁盘持久化(persistence), 并通过 Redis哨兵(Sentinel)和自动 分区(Cluster)提供高可用性(high availability)。
Redis命令参考文档网址:http://redisdoc.com
1.1 常用数据结构
Redis中的数据,总体上是键值对,不同数据类型指的是键值对中值的类型。值类型有String、list、set、hash、zset。
Redis 的key 是字符串类型,但是key 中不能包括边界字符,由于key 不是binary safe的字符串,所以像"my key"和"mykey\n"这样包含空格和换行的key 是不允许的。
- String:Redis中最基本的类型,它是key对应的一个单一值。二进制安全,不必担心由于编码等问题导致二进制数据变化。所以redis的string可以包含任何数据,比如jpg图片或者序列化的对象。Redis中一个字符串值的最大容量是512M。
- list:Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。说明它的底层是基于链表实现的,所以它操作时头尾效率高,中间效率低。
- set: Redis的set是string类型的无序集合。它是基于哈希表实现的。
- hash: 本身就是一个键值对集合。可以当做Java中的Map<String,Object>对待。
- zset: Redis zset 和 set 一样也是string类型元素的集合,且不允许重复的成员。不同的是每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。zset的成员是唯一的,但分数(score)却可以重复。
1.2 安装后的可执行文件
redis-benchmark ----性能测试工具
redis-check-aof ----AOF文件修复工具
redis-check-dump ----RDB文件检查工具(快照持久化文件)
redis-cli ----命令行客户端
redis-server ----redis服务器启动命令
1.3 配置文件
拷贝 cp /opt/redis-4.0.2/redis.conf /usr/local/redis/
修改配置项
| 配置项名称 | 作用 | 取值 |
|---|---|---|
| daemonize | 控制是否以守护进程形式运行Redis服务器 | yes |
| logfile | 指定日志文件位置 | "/var/logs/redis.log" |
| dir | Redis工作目录,设定当前服务文件保存位置,包含日志文件、持久化文件 | /usr/local/redis |
| bind | 绑定主机地址,允许访问的ip地址 | 127.0.0.1 |
| port | 服务器端口号 | 6379 |
| databases | 数据库数量 | 16 |
| loglevel | 设置服务器以指定日志记录级别 | debug |
| maxclients | 设置同一时间最大客户端连接数,默认无限制。当客户端连接到达上限,Redis会关闭新的连接 | 0 |
| timeout | 客户端闲置等待最大时长,达到最大值后关闭连接。如需关闭该功能,设置为 0 | 300 |
| include | 导入并加载指定配置文件信息,用于快速创建redis公共配置较多的redis实例配置文件,便于维护 | /path/server-端口号.conf |
1.4 命令操作
系统操作
# 清屏
<NolebasePageProperties />
clear
# 退出客户端
quit
exit
# 帮助
help 命令
help @组名
# 启动
redis-server
redis-server –-port 6379
# 指定配置文件启动
redis-server redis.conf
# redis客户端连接
redis-cli
redis-cli -h 127.0.0.1
redis-cli –port 6379
redis-cli -h 127.0.0.1 –port 6379
# RDB触发持久化
save数据库切换及发布订阅
# 使用select进行切换,数据库索引从0开始
select 0
# 订阅消息
SUBSCRIBE 消息主题
# 发布消息
PUBLISH 消息主题 消息内容查看数据库长度
# 查看数据库长度
dbsize
# 返回当前 redis 服务器状态和一些统计信息。
INFO
# 实时监听并返回redis服务器接收到的所有请求信息。
MONITOR
# 把数据同步保存到磁盘上,并关闭redis服务。
SHUTDOWN
# 获取一个 redis配置参数信息。(个别参数可能无法获取)
CONFIG GET parameter
# 设置一个 redis配置参数信息。(个别参数可能无法获取)
CONFIG SET parameter value
# 重置 INFO 命令的统计信息。(重置包括:Keyspace 命中数、
CONFIG RESETSTAT
# 错误数、处理命令数,接收连接数、过期key 数)
Keyspace
# 获取一个 key 的调试信息。
DEBUG OBJECT key
# 制造一次服务器当机。
DEBUG SEGFAULT
# 删除当前数据库中所有 key,此方法不会失败。小心慎用
FLUSHDB
# 删除全部数据库中所有 key,此方法不会失败。小心慎用
FLUSHALLKey操作命令
# 返回匹配指定模式的所有 key 规则 * 匹配任意数量的任意符号 ? 配合一个任意符号 [] 匹配一个指定符号
KEYS PATTERN
# 返回KEY对应的值的类型
TYPE KEY
# 把一组键值对数据移动到另一个数据库中
MOVE KEY DB
#根据KEY进行删除,至少要指定一个KEY
DEL KEY [KEY ...]
# 检查指定的KEY是否存在。指定一个KEY时,存在返回1,不存在返回0。可以指定多个,返回存在的KEY的数量。
EXISTS KEY
# 在现有的KEY中随机返回一个
RANDOMKEY
# 重命名一个KEY,NEWKEY不管是否是已经存在的都会执行,如果NEWKEY已经存在则会被覆盖。
RENAME KEY NEWKEY
# 只有在NEWKEY不存在时能够执行成功,否则失败
RENAMENX KEY NEWKEY
# 返回当前UNIX时间戳
TIME
# 以秒为单位查看KEY还能存在多长时间
TTL KEY
# 以毫秒为单位查看KEY还能存在多长时间
PTTL KEY
# 给一个KEY设置在SECONDS秒后过期,过期会被Redis移除。
EXPIRE KEY SECONDS
# 设置一个KEY在TIMESTAMP指定的时间过期
EXPIREAT KEY TIMESTAMP
# 以毫秒为单位指定过期时间
PEXPIRE KEY MILLISECONDS
# 以毫秒为单位指定过期的时间戳
PEXPIREAT KEY MILLISECONDS-TIMESTAMP
# 移除过期时间,变成永久key
PERSIST KEY
# 查询所有的键
keys *
# 通过索引选择数据库,默认连接的数据库是 0,默认数据库数是 16 个。返回 1表示成功,0 失败。
select db-index
# 将 key 从当前数据库移动到指定数据库。返回 1 表示成功。0 表示 key不存在或者已经在指定数据库中。
move key db-indexstring操作
# 给KEY设置一个string类型的值。
# EX参数用于设置存活的秒数。
# PX参数用于设置存活的毫秒数。
# NX参数表示当前命令中指定的KEY不存在才行。
# XX参数表示当前命令中指定的KEY存在才行。
SET KEY VALUE [EX SECONDS] [PX MILLISECONDS] [NX|XX]
# 根据key得到值,只能用于string类型。
GET KEY
# 把指定的value追加到KEY对应的原来的值后面,返回值是追加后字符串长度
APPEND KEY VALUE
# 直接返回字符串长度
STRLEN KEY
# 自增1
INCR KEY
# 自减1
DECR KEY
# 原值+INCREMENT
INCRBY KEY INCREMENT
# 原值-DECREMENT
DECRBY KEY DECREMENT
# 从字符串中取指定的一段
GETRANGE KEY START END
# 从offset开始使用VALUE进行替换
SETRANGE KEY OFFSET VALUE
# 设置KEY,VALUE时指定存在秒数
SETEX KEY SECONDS VALUE
# 新建字符串类型的键值对如果 key 不存在,设置 key 对应 string 类型的值。如果 key 已经存在,返回0。
SETNX KEY VALUE
# 一次性设置一组多个键值对
MSET KEY VALUE [KEY VALUE ...]
# 一次性指定多个KEY,返回它们对应的值,没有值的KEY返回值是(nil)
MGET KEY [KEY ...]
# 一次性新建多个值
MSETNX KEY VALUE [KEY VALUE ...]
# 设置新值,同时能够将旧值返回,如果 key不存在返回 nil。
GETSET KEY VALUElist操作
# 从左边插入
LPUSH key value [value ...]
# 从右边插入
RPUSH key value [value ...]
# 根据list集合的索引打印元素数据
# 正着数:0,1,2,3,...
# 倒着数:-1,-2,-3,...
LRANGE key start stop
# 返回key的长度
LLEN key
# 从左边弹出一个元素。弹出=返回+删除。
LPOP key
# 从右边弹出一个元素。
RPOP key
# 从source中RPOP一个元素,LPUSH到destination中
RPOPLPUSH source destination
# 根据索引从集合中取值
LINDEX key index
# 在pivot指定的值前面或后面插入value
LINSERT key BEFORE|AFTER pivot value
# 只能针对存在的list执行LPUSH
LPUSHX key value
# 根据count指定的数量从key对应的list中删除value
LREM key count value
# 把指定索引位置的元素替换为另一个值
LSET key index value
# 仅保留指定区间的数据,两边的数据被删除
LTRIM key start stopset操作
# 天机数据
SADD key member [member ...]
# 集合key中的所有成员
SMEMBERS key
# 返回集合中元素的数量
SCARD key
# 检查当前指定member是否是集合中的元素
SISMEMBER key member
# 从集合中删除元素
SREM key member [member ...]
# 将指定的集合进行“交集”操作
SINTER key [key ...]
# 取交集后存入destination
SINTERSTORE destination key [key ...]
# 将指定的集合执行“差集”操作
SDIFF key [key ...]
# 取差集后存入destination
SDIFFSTORE destination key [key ...]
# 将指定的集合执行“并集”操作
SUNION key [key ...]
# 取并集后存入destination
SUNIONSTORE destination key [key ...]
# 把member从source移动到destination
SMOVE source destination member
# 从集合中随机弹出count个数量的元素,count不指定就弹出1个
SPOP key [count]
# 从集合中随机返回count个数量的元素,count不指定就返回1个
SRANDMEMBER key [count]
# 基于游标的遍历
SSCAN key cursor [MATCH pattern] [COUNT count]hash操作
# 新增或更新key的值
HSET key field value
# 返回哈希表 key 中,所有的域和值。
HGETALL key
# 获取哈希表key field域的值
HGET key field
# 返回哈希表 key 中域的数量。
HLEN key
# 返回哈希表 key 中的所有域。
HKEYS key
# 返回哈希表 key 中所有域的值。
HVALS key
# 检查给定域 field 是否存在于哈希表 hash 当中。
HEXISTS key field
# 删除哈希表 key 中的一个或多个指定域,不存在的域将被忽略。
HDEL key field [field ...]
# 为哈希表 key 中的域 field 的值加上增量 increment 。
HINCRBY key field increment
# 返回哈希表 key 中,一个或多个给定域的值。
HMGET key field [field ...]
# 同时将多个 field-value (域-值)对设置到哈希表 key 中。
HMSET key field value [field value ...]
# 当且仅当域 field 尚未存在于哈希表的情况下, 将它的值设置为 value 。
HSETNX key field value
# Redis HSCAN 命令用于迭代哈希表中的键值对。
HSCAN key cursor [MATCH pattern] [COUNT count]zset操作
# 将一个或多个 member 元素及其 score 值加入到有序集 key 当中。
ZADD key [NX|XX] [CH] [INCR] score member [score member ...]
# 返回有序集 key 中,指定区间内的成员。
ZRANGE key start stop [WITHSCORES]
# 返回有序集 key 的基数。
ZCARD key
# 根据分数在min,max之间查找元素
ZCOUNT key min max
# 返回有序集 key 中,成员 member 的 score 值。
ZSCORE key member
# 为有序集 key 的成员 member 的 score 值加上增量 increment 。
ZINCRBY key increment member
# 对于一个所有成员的分值都相同的有序集合键 key 来说, 这个命令会返回该集合中, 成员介于 min 和 max 范围内的元素数量。
ZLEXCOUNT key min max
# 按照字母顺序在区间内返回member
# min和max使用“[a”表示闭区间,使用“(a”表示开区间
# -表示负无穷
# +表示正无穷
ZRANGEBYLEX key min max [LIMIT offset count]
# 在分数的指定区间内返回数据
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
# 先对分数进行升序排序,返回member的排名
ZRANK key member
# 移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。
ZREM key member [member ...]
# 对于一个所有成员的分值都相同的有序集合键 key 来说, 这个命令会移除该集合中, 成员介于 min 和 max 范围内的所有元素。
ZREMRANGEBYLEX key min max
# 移除有序集 key 中,指定排名(rank)区间内的所有成员。
ZREMRANGEBYRANK key start stop
# 移除有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。
ZREMRANGEBYSCORE key min max
# 返回有序集 key 中,指定区间内的成员。
ZREVRANGE key start stop [WITHSCORES]
# 返回有序集 key 中, score 值介于 max 和 min 之间(默认包括等于 max 或 min )的所有的成员。有序集成员按 score 值递减(从大到小)的次序排列。
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
# 返回有序集 key 中成员 member 的排名。其中有序集成员按 score 值递减(从大到小)排序。
ZREVRANK key member
# 计算给定的一个或多个有序集的交集,其中给定 key 的数量必须以 numkeys 参数指定,并将该交集(结果集)储存到 destination 。
ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
# 计算给定的一个或多个有序集的并集,其中给定 key 的数量必须以 numkeys 参数指定,并将该并集(结果集)储存到 destination 。
ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX]
# 迭代有序集合中的元素(包括元素成员和元素分值)
ZSCAN key cursor [MATCH pattern] [COUNT count]高级数据类型 Bitmaps
#获取指定key对应偏移量上的bit值
getbit key offset
#设置指定key对应偏移量上的bit值,value只能是1或0
setbit key offset value
# 对指定key按位进行交、并、非、异或操作,并将结果保存到destKey中
# and:交
# or:并
# not:非
# xor:异或
bitop op destKey key1 [key2...]
# 统计指定key中1的数量
bitcount key [start end]
# 查找bit数组中指定范围内第一个0或1出现的位置
BITPOS
# 操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
BITFIELD
# 获取BitMap中bit数组,并以十进制形式返回
BITFIELD_RO高级数据类型 HyperLogLog
基数是数据集去重后元素个数 , HyperLogLog 是用来做基数统计的,运用了LogLog的算法https://juejin.cn/post/6844903785744056333#heading-0。Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb
作用:•做海量数据的统计工作
优点:内存占用极低、性能非常好
缺点:•有一定的误差
- 用于进行基数统计,不是集合,不保存数据,只记录数量而不是具体数据
- 核心是基数估算算法,最终数值存在一定误差
- 误差范围:基数估计的结果是一个带有 0.81% 标准错误的近似值
- 耗空间极小,每个hyperloglog key占用了12K的内存用于标记基数
- pfadd命令不是一次性分配12K内存使用,会随着基数的增加内存逐渐增大
- Pfmerge命令合并后占用的存储空间为12K,无论合并之前数据量多少
# 添加数据
pfadd key element [element ...]
# 统计数据
pfcount key [key ...]
# 合并数据
pfmerge destkey sourcekey [sourcekey...]高级数据类型 GEO
# 添加坐标点
geoadd key longitude latitude member [longitude latitude member ...]
# 获取坐标点
geopos key member [member ...]
# 计算坐标点距离
geodist key member1 member2 [unit]
# 添加坐标点
georadius key longitude latitude radius m|km|ft|mi [withcoord] [withdist] [withhash] [count count]
# 获取坐标点
georadiusbymember key member radius m|km|ft|mi [withcoord] [withdist] [withhash] [count count]
# 计算经纬度
geohash key member [member ...]
# 在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
GEOSEARCH
# 与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能
GEOSEARCHSTORE1.5 Redis单线程
保证高并发:
- 完全基于内存
- 数据结构简单,对数据操作也简单
- 使用多路 I/O 复用模型,充分利用CPU资源
单线程优势有下面几点:
- 代码更清晰,处理逻辑更简单
- 不用去考虑各种锁的问题,不存在加锁释放锁操作,没有因为锁而导致的性能消耗
- 不存在多进程或者多线程导致的CPU切换,充分利用CPU资源
1.6 Redis应用场景
(1)共享session
在分布式系统下,服务会部署在不同的tomcat,因此多个tomcat的session无法共享,以前存储在session中的数据无法实现共享,可以用redis代替session,解决分布式系统间数据共享问题。
(2)数据缓存
Redis采用内存存储,读写效率较高。我们可以把数据库的访问频率高的热点数据存储到redis中,这样用户请求时优先从redis中读取,减少数据库压力,提高并发能力。
(3)异步队列
Reids在内存存储引擎领域的一大优点是提供 list 和 set 操作,这使得Redis能作为一个很好的消息队列平台来使用。而且Redis中还有pub/sub这样的专用结构,用于1对N的消息通信模式。
(4)分布式锁
Redis中的乐观锁机制,可以帮助我们实现分布式锁的效果,用于解决分布式系统下的多线程安全问题
二、使用示例
2.1 与springboot整合
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--连接池依赖-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>配置
spring:
redis:
host: localhost
port: 6379
password: 123321
lettuce:
pool:
max-active: 8 # 最大连接
max-idle: 8 # 最大空闲连接
min-idle: 0 # 最小空闲连接
max-wait: 100 # 连接等待时间配置类
/**
* redis配置
*
* @author ruoyi
*/
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
@Bean
@SuppressWarnings(value = {"unchecked", "rawtypes"})
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
//使用fastjson序列化
FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
serializer.setObjectMapper(mapper);
template.setValueSerializer(serializer);
template.setHashValueSerializer(serializer);
// 使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(RedisSerializer.string());
template.afterPropertiesSet();
return template;
}
}redis序列化使用fastjson
/**
* Redis使用FastJson序列化
*
* @author ruoyi
*/
public class FastJson2JsonRedisSerializer<T> implements RedisSerializer<T> {
@SuppressWarnings("unused")
private ObjectMapper objectMapper = new ObjectMapper();
public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
private Class<T> clazz;
static {
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
}
public FastJson2JsonRedisSerializer(Class<T> clazz) {
super();
this.clazz = clazz;
}
@Override
public byte[] serialize(T t) throws SerializationException {
if (t == null) {
return new byte[0];
}
return JSON.toJSONString(t, SerializerFeature.WriteClassName).getBytes(DEFAULT_CHARSET);
}
@Override
public T deserialize(byte[] bytes) throws SerializationException {
if (bytes == null || bytes.length <= 0) {
return null;
}
String str = new String(bytes, DEFAULT_CHARSET);
return JSON.parseObject(str, clazz);
}
public void setObjectMapper(ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "'objectMapper' must not be null");
this.objectMapper = objectMapper;
}
protected JavaType getJavaType(Class<?> clazz) {
return TypeFactory.defaultInstance().constructType(clazz);
}
}使用
通过注入RedisTemplate即可通过api进行操作,此外,springboot还提供了专用的API接口StringRedisTemplate。可以理解为这是RedisTemplate的一种指定数据泛型的操作API。
| API | 返回值类型 | 说明 |
|---|---|---|
| redisTemplate.opsForValue() | ValueOperations | 操作String类型数据 |
| redisTemplate.opsForHash() | HashOperations | 操作Hash类型数据 |
| redisTemplate.opsForList() | ListOperations | 操作List类型数据 |
| redisTemplate.opsForSet() | SetOperations | 操作Set类型数据 |
| redisTemplate.opsForZSet() | ZSetOperations | 操作SortedSet类型数据 |
| redisTemplate | 通用的命令 |
客户端选择
springboot整合redis技术提供了多种客户端兼容模式,默认提供的是lettucs客户端技术,也可以根据需要切换成指定客户端技术,如切换为jedis客户端。
添加jedis依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>修改配置
spring:
redis:
host: localhost
port: 6379
client-type: jedis
lettuce:
pool:
max-active: 16
jedis:
pool:
max-active: 16lettcus与jedis区别
- jedis连接Redis服务器是直连模式,当多线程模式下使用jedis会存在线程安全问题,解决方案可以通过配置连接池使每个连接专用,这样整体性能就大受影响
- lettcus基于Netty框架进行与Redis服务器连接,底层设计中采用StatefulRedisConnection。 StatefulRedisConnection自身是线程安全的,可以保障并发访问安全问题,所以一个连接可以被多线程复用。当然lettcus也支持多连接实例一起工作
2.2 redis状态监控
@Autowired
private RedisTemplate<String, String> redisTemplate;
@GetMapping()
public AjaxResult getInfo() throws Exception {
Properties info = (Properties) redisTemplate.execute((RedisCallback<Object>) connection -> connection.info());
Properties commandStats = (Properties) redisTemplate.execute((RedisCallback<Object>) connection -> connection.info("commandstats"));
Object dbSize = redisTemplate.execute((RedisCallback<Object>) connection -> connection.dbSize());
Map<String, Object> result = new HashMap<>(3);
//连接信息
result.put("info", info);
//数据库大小
result.put("dbSize", dbSize);
//命令统计
List<Map<String, String>> pieList = new ArrayList<>();
commandStats.stringPropertyNames().forEach(key -> {
Map<String, String> data = new HashMap<>(2);
String property = commandStats.getProperty(key);
data.put("name", StringUtils.removeStart(key, "cmdstat_"));
data.put("value", StringUtils.substringBetween(property, "calls=", ",usec"));
pieList.add(data);
});
result.put("commandStats", pieList);
return AjaxResult.success(result);
}
/*
返回信息
Redis版本 {{ cache.info.redis_version }}
运行模式 {{ cache.info.redis_mode == "standalone" ? "单机" : "集群" }}
端口 {{ cache.info.tcp_port }}
客户端数 {{ cache.info.connected_clients }}
运行时间(天) {{ cache.info.uptime_in_days }}
使用内存 {{ cache.info.used_memory_human }}
使用CPU {{ parseFloat(cache.info.used_cpu_user_children).toFixed(2) }}
内存配置 {{ cache.info.maxmemory_human }}
AOF是否开启 {{ cache.info.aof_enabled == "0" ? "否" : "是" }}
RDB是否成功 {{ cache.info.rdb_last_bgsave_status }}
Key数量 {{ cache.dbSize }}
网络入口/出口 {{ cache.info.instantaneous_input_kbps }}kps/{{cache.info.instantaneous_output_kbps}}kps
*/2.3 使用Redisson操作
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
官网地址: https://redisson.org
GitHub地址: https://github.com/redisson/redisson
依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>配置类
@Configuration
public class RedisConfig {
@Bean
public RedissonClient redissonClient() {
// 配置类
Config config = new Config();
// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://192.168.150.101:6379").setPassowrd("123321");
// 创建客户端
return Redisson.create(config);
}
}使用分布式锁
可重入原理:利用hash结构记录线程id和重入次数,是自己创建得锁才可重入或释放,释放/冲入对次数进行加/减1.
可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间
multiLock:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
@Resource
private RedissonClient redissonClient;
@Test
void testRedisson() throws InterruptedException {
// 获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
// 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
// 判断释放获取成功
if(isLock){
try {
System.out.println("执行业务");
}finally {
// 释放锁
lock.unlock();
}
}
}2.4 springboot操作集群
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>配置
spring:
redis:
sentinel:
master: mymaster
nodes:
- 192.168.150.101:27001
- 192.168.150.101:27002
- 192.168.150.101:27003配置类
@Bean
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){
// MASTER:从主节点读取
// MASTER_PREFERRED:优先从master节点读取,master不可用才读取replica
// REPLICA:从slave(replica)节点读取
// REPLICA _PREFERRED:优先从slave(replica)节点读取,所有的slave都不可用才读取master
return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}2.5过期数据(红包代金券订单)清除
在电商系统中,秒杀,抢购,红包优惠卷等操作,一般都会设置时间限制,比如订单15分钟不付款自动关闭,红包有效期24小时等等,使用定时任务,需要频繁执行,会造成很大的数据库压力,可以使用消息通知,当rediskey过期时,监听key过期的消息,根据key获取相关数据id,进行清除操作
springdataredis依赖
<!-- spring data redis相关坐标 beg -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.2</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>${springdataredis.version}</version>
</dependency>整合配置
<!-- springData Redis 核心api-->
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="keySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"></bean>
</property>
<property name="valueSerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"></bean>
</property>
</bean>
<!-- jedis连接工厂 -->
<bean id="connectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="hostName" value="127.0.0.1"></property>
<property name="port" value="6379"></property>
<property name="database" value="0"></property>
<property name="poolConfig" ref="poolConfig"></property>
</bean>
<!-- jedis连接池的配置信息 -->
<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxIdle" value="5"></property>
<property name="maxTotal" value="10"></property>
<property name="testOnBorrow" value="true"></property>
</bean>监听redis消息
/**
* 消息监听器:需要实现MessageListener接口
* 实现onMessage方法
*/
public class RedisMessageListener implements MessageListener {
/**
* 处理redis消息:当从redis中获取消息后,打印主题名称和基本的消息
*/
public void onMessage(Message message, byte[] pattern) {
System.out.println("从channel为" + new String(message.getChannel())
+ "中获取了一条新的消息,消息内容:" + new String(message.getBody()));
}
}消息订阅配置
<!-- 配置处理消息的消息监听适配器 -->
<bean class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter" id="messageListener">
<!-- 构造方法注入:自定义的消息监听 -->
<constructor-arg>
<bean class="cn.itcast.redis.listener.RedisKeyExpiredMessageDelegate"/>
</constructor-arg>
</bean>
<!-- 消息监听者容器:对所有的消息进行统一管理 -->
<bean class="org.springframework.data.redis.listener.RedisMessageListenerContainer" id="redisContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="messageListeners">
<map>
<!-- 配置频道与监听器
将此频道中的内容交由此监听器处理
key-ref:监听,处理消息
ChannelTopic:订阅的消息频道
__keyevent@0__:expired 配置订阅的主题名称
此名称时redis提供的名称,标识过期key消息通知
0表示db0 根据自己的dbindex选择合适的数字
-->
<entry key-ref="messageListener">
<list>
<bean class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="__keyevent@0__:expired"></constructor-arg>
</bean>
</list>
</entry>
</map>
</property>
</bean>redis自带消息
在redis的内部当一个key失效时,也会向固定的频道中发送一条消息,我们只需要监听到此消息获取数据库中的id,再进行操作即可
修改 notify-keyspace-events Ex
# K 键空间通知,以__keyspace@<db>__为前缀
# E 键事件通知,以__keysevent@<db>__为前缀
# g del , expipre , rename 等类型无关的通用命令的通知, ...
# $ String命令
# l List命令
# s Set命令
# h Hash命令
# z 有序集合命令
# x 过期事件(每次key过期时生成)
# e 驱逐事件(当key在内存满了被清除时生成)
# A g$lshzxe的别名,因此”AKE”意味着所有的事件2.6 Jedis操作Redis
依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>操作reids
// 命令式操作
//指定Redis服务器的IP地址和端口号
Jedis jedis = new Jedis("192.168.200.100", 6379);
//执行ping命令
String ping = jedis.ping();
System.out.println(ping);
// 通过 jedis可以设置 redis的主从关系:
jedis.slaveOf("192.168.1.35", 6379);
// 批处理
Pipeline p = jedis.pipelined();
p.set("fool", "bar");//.....一批命令
p.sync();
// 一些命令一起执行而不被干扰
jedis.watch (key1, key2, ...);
BinaryTransaction t = jedis.multi();
t.set("foo", "bar");
t.exec();
// 存储和读取redis数据
// 1) 字符串类型 string
//存储
jedis.set("username","zhangsan");
//获取
String username = jedis.get("username");
//可以使用setex()方法存储可以指定过期时间的 key value
jedis.setex("activecode",20,"hehe");//将activecode:hehe键值对存入redis,并且20秒后自动删除该键值对
// 2) 哈希类型 hash : map格式
// 存储hash
jedis.hset("user","name","lisi");
jedis.hset("user","age","23");
// 获取hash
String name = jedis.hget("user", "name");
// 获取hash的所有map中的数据
Map<String, String> user = jedis.hgetAll("user");
// 3) 列表类型 list : linkedlist格式。支持重复元素
// list 存储
jedis.lpush("mylist","a","b","c");//从左边存
jedis.rpush("mylist","a","b","c");//从右边存
// list 范围获取
List<String> mylist = jedis.lrange("mylist", 0, -1);
System.out.println(mylist);
// list 弹出
String element1 = jedis.lpop("mylist");//c
System.out.println(element1);
String element2 = jedis.rpop("mylist");//c
System.out.println(element2);
// 4) 集合类型 set : 不允许重复元素
// set 存储
jedis.sadd("myset","java","php","c++");
// set 获取
Set<String> myset = jedis.smembers("myset");
System.out.println(myset);
// 有序集合类型 sortedset:不允许重复元素,且元素有顺序
// sortedset 存储
jedis.zadd("mysortedset",3,"亚瑟");
jedis.zadd("mysortedset",30,"后裔");
jedis.zadd("mysortedset",55,"孙悟空");
// sortedset 获取
Set<String> mysortedset = jedis.zrange("mysortedset", 0, -1);
//关闭连接
jedis.close();订阅
class MyListener extends JedisPubSub {
public void onMessage(String channel, String message) {
}
public void onSubscribe(String channel, int subscribedChannels) {
}
public void onUnsubscribe(String channel, int subscribedChannels) { }
public void onPSubscribe(String pattern, int subscribedChannels) { }
public void onPUnsubscribe(String pattern, int subscribedChannels) { }
public void onPMessage(String pattern, String channel, String message) { }
}
// 使用
MyListener l = new MyListener();
jedis.subscribe(l, "foo");ShardedJedis
ShardedJedis 是一种帮助提高读/写并发能力的群集,群集使用一致性 hash 来确保一个key 始终被指向相同的redis server。每个redis server 被称为一个shard。是一个master。因此不能一些跨shard的操作是不能被执行的。如:transactions、pipelining、pub/sub 。
//定义shards:
List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>();
JedisShardInfo si = new JedisShardInfo("localhost", 6379);
si.setPassword("foobared");
shards.add(si);
si = new JedisShardInfo("localhost", 6380);
si.setPassword("foobared");
shards.add(si);
//直接使用:
ShardedJedis jedis = new ShardedJedis(shards);
jedis.set("a", "foo");
jedis.disconnect;
//或 使用连接池
ShardedJedisPool pool = new ShardedJedisPool(new Config(), shards);
ShardedJedis jedis = pool.getResource();
jedis.set("a", "foo");
.... // do your work here
pool.returnResource(jedis);
.... // a few moments later
ShardedJedis jedis2 = pool.getResource();
jedis.set("z", "bar");
pool.returnResource(jedis);
pool.destroy();
//判断使用的是那个shards:
ShardInfo si = jedis.getShardInfo(key);
si.getHost/getPort/getPassword/getTimeout/getName
//可以通过keytags 来确保key 位于相同的shard。如:
ShardedJedis jedis = new ShardedJedis(shards, ShardedJedis.DEFAULT_KEY_TAG_PATTERN);
// 默认的keytags 是”{}”
jedis.set("foo{bar}", "12345");
jedis.set("car{bar}", "877878");ShardedJedisPipeline
ShardedJedisPipeline使ShardedJedis 有平滑的支持 redis 的 pipeline 的功能。
ShardedJedis 通过向每个用到的shard 发起 pipeline 来实现ShardedJedisPipeline的功能,但这种方式如果累积的 key 不够多,很难达到提高效率的目的。 如果需要在ShardedJedis 中使用pipeline,还是建议尽量通过keytag 将关联的key 放到同一shard 之中。
ShardedJedis jedis = new ShardedJedis(shards);
ShardedJedisPipeline p = jedis.pipelined();
p.set("foo", "bar");
p.get("foo");
List<Object> results = p.syncAndReturnAll();
//assertEquals(2, results.size());
//assertEquals("OK", results.get(0));
//assertEquals("bar", results.get(1));JedisPool
jedis 通过commons-pool 来提供其对象池的功能,其对象池类有JedisPool 和ShardedJedisPool,面向普通的redis 连接池和 pre-sharding 的redis 连接池。 通过 JedisPoolConfig 类完成连接池配置
//jedis 创建对象池的方式
JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");
//2.获取连接
Jedis jedis = pool.getResource();
try {
/// ... do stuff here ... for example
jedis.set("foo", "bar");
String foobar = jedis.get("foo");
jedis.zadd("sose", 0, "car");
jedis.zadd("sose", 0, "bike");
Set<String> sose = jedis.zrange("sose", 0, -1);
} catch (JedisConnectionException e) {
// returnBrokenResource when the state of the object is unrecoverable
if (null != jedis) {
pool.returnBrokenResource(jedis);
jedis = null;
}
} finally {
/// ... it's important to return the Jedis instance to the pool once you've finished using it if (null != jedis)
pool.returnResource(jedis);
}
/// ... when closing your application:
pool.destroy();commons-pool 对象池配置
jedis 的对象池是通过 apache 的 commons-pool 实现的。其对象池的配置是通过org.apache.commons.pool.impl.GenericObjectPool.Config 类完成
maxActive 控制池中对象的最大数量。
默认值是8,如果是负值表示没限制。maxIdle 控制池中空闲的对象的最大数量。
默认值是8,如果是负值表示没限制。minIdle 控制池中空闲的对象的最小数量。
默认值是0。whenExhaustedAction 指定池中对象被消耗完以后的行为,有下面这些选择:
WHEN_EXHAUSTED_FAIL 0,当池中对象达到上限以后,继续 borrowObject 会抛出NoSuchElementException 异常。
WHEN_EXHAUSTED_GROW 2,当池中对象达到上限以后,会创建一个新对象,并返回它。
WHEN_EXHAUSTED_BLOCK 1,当池中对象达到上限以后,会一直等待,直到有一个对象可用。这个行为还与 maxWait 有关,如果 maxWait 是正数,那么会等待 maxWait 的毫秒的时间,超时会抛出NoSuchElementException 异常;如果 maxWait 为负值,会永久等待。
默认值是 WHEN_EXHAUSTED_BLOCK,maxWait 的默认值是-1。maxWait whenExhaustedAction 如果是 WHEN_EXHAUSTED_BLOCK,指定等待的毫秒数。如果 maxWait是正数,那么会等待maxWait 的毫秒的时间,超时会抛出NoSuchElementException 异常;如果maxWait 为负值,会永久等待。
maxWait 的默认值是-1。testOnBorrow 如果testOnBorrow 被设置,pool 会在borrowObject 返回对象之前使用PoolableObjectFactory的 validateObject 来验证这个对象是否有效,要是对象没通过验证,这个对象会被丢弃,然后重新选择一个新的对象。
testOnBorrow 的默认值是 false。testOnReturn 如果 testOnReturn 被设置,pool 会在 returnObject 的时候通过 PoolableObjectFactory 的validateObject 方法验证对象,如果对象没通过验证,对象会被丢弃,不会被放到池中。
testOnReturn 的默认值是 false。testWhileIdle 指定idle 对象是否应该使用 PoolableObjectFactory 的 validateObject 校验,如果校验失败,这个对象会从对象池中被清除。 这个设置仅在timeBetweenEvictionRunsMillis 被设置成正值(>0)的时候才会生效。
testWhileIdle 的默认值是 false。timeBetweenEvictionRunsMillis 指定驱逐线程的休眠时间。如果这个值不是正数(>0),不会有驱逐线程运行。
timeBetweenEvictionRunsMillis 的默认值是-1。numTestsPerEvictionRun 设置驱逐线程每次检测对象的数量。
这个设置仅在timeBetweenEvictionRunsMillis 被设置成正值(>0)的时候才会生效。
numTestsPerEvictionRun 的默认值是3。minEvictableIdleTimeMillis 指定最小的空闲驱逐的时间间隔(空闲超过指定的时间的对象,会被清除掉)。 这个设置仅在timeBetweenEvictionRunsMillis 被设置成正值(>0)的时候才会生效。
minEvictableIdleTimeMillis 默认值是30 分钟。softMinEvictableIdleTimeMillis 与 minEvictableIdleTimeMillis 类似,也是指定最小的空闲驱逐的时间间隔(空闲超过指定的时间的对象,会被清除掉),不过会参考minIdle 的值,只有 idle 对象的数量超过 minIdle 的值,对象才会被清除。
这个设置仅在 timeBetweenEvictionRunsMillis 被设置成正值(>0)的时候才会生效,并且这个配置能被 minEvictableIdleTimeMillis 配置取代(minEvictableIdleTimeMillis 配置项的优先级更高)。
softMinEvictableIdleTimeMillis 的默认值是-1。lifo pool 可以被配置成 LIFO 队列(last-in-first-out)或 FIFO 队列(first-in-first-out),来指定空闲对象被使用的次序。
lifo 的默认值是true。**JedisPoolConfig **
javapublic class JedisPoolConfig extends Config { public JedisPoolConfig() { // defaults to make your life with connection pool easier :) setTestWhileIdle(true); setMinEvictableIdleTimeMillis(60000); setTimeBetweenEvictionRunsMillis(30000); setNumTestsPerEvictionRun(-1); }
使用jedis访问集群
Set<HostAndPort> set = new HashSet<>();
set.add(new HostAndPort("192.168.139.132", 7001));
set.add(new HostAndPort("192.168.139.132", 7002));
set.add(new HostAndPort("192.168.139.132", 7003));
set.add(new HostAndPort("192.168.139.132", 7004));
set.add(new HostAndPort("192.168.139.132", 7005));
set.add(new HostAndPort("192.168.139.132", 7006));
JedisCluster cluster = new JedisCluster(set);
String result = cluster.get("a");
System.out.println(result);与spring整合配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd">
<!-- 包扫描器,扫描带@Service注解的类 -->
<context:component-scan base-package="com.hg.rest.service"></context:component-scan>
<!-- 配置redis客户端单机版 -->
<bean id="jedisPool" class="redis.clients.jedis.JedisPool">
<constructor-arg name="host" value="192.168.198.153"></constructor-arg>
<constructor-arg name="port" value="6379"></constructor-arg>
</bean>
<!-- 配置redis客户端实现类 -->
<bean id="jedisClientSingle" class="com.hg.rest.component.impl.JedisClientSingle"/>
<!-- 配置redis客户端集群版 -->
<!-- <bean id="jedisCluster" class="redis.clients.jedis.JedisCluster">
<constructor-arg>
<set>
<bean class="redis.clients.jedis.HostAndPort">
<constructor-arg name="host" value="192.168.198.153"/>
<constructor-arg name="port" value="7001"/>
</bean>
<bean class="redis.clients.jedis.HostAndPort">
<constructor-arg name="host" value="192.168.198.153"/>
<constructor-arg name="port" value="7002"/>
</bean>
<bean class="redis.clients.jedis.HostAndPort">
<constructor-arg name="host" value="192.168.198.153"/>
<constructor-arg name="port" value="7003"/>
</bean>
<bean class="redis.clients.jedis.HostAndPort">
<constructor-arg name="host" value="192.168.198.153"/>
<constructor-arg name="port" value="7004"/>
</bean>
<bean class="redis.clients.jedis.HostAndPort">
<constructor-arg name="host" value="192.168.198.153"/>
<constructor-arg name="port" value="7005"/>
</bean>
<bean class="redis.clients.jedis.HostAndPort">
<constructor-arg name="host" value="192.168.198.153"/>
<constructor-arg name="port" value="7006"/>
</bean>
</set>
</constructor-arg>
</bean>
<bean id="jedisClientCluster" class="com.hg.rest.component.impl.JedisClientCluster"/> -->
</beans>三、核心知识
3.1 数据持久化
Redis工作时数据都存储在内存中,万一服务器断电,则所有数据都会丢失。针对这种情况,Redis采用持久化机制来增强数据安全性。
RDB
RDB全称Redis Database Backup file(Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件称为RDB文件,默认是保存在当前运行目录。
每隔一定的时间把内存中的数据作为一个快照保存到硬盘上的文件中。Redis默认开启RDB机制。
- 触发方式
[1]默认配置
save 900 1
save 300 10
save 60 10000含义
| 配置 | 含义 |
|---|---|
| save 900 1 | 900秒内至少有一次修改则触发保存操作 |
| save 300 10 | 300秒内至少有10次修改则触发保存操作 |
| save 60 10000 | 60秒内至少有1万次修改则触发保存操作 |
[2]使用保存命令
save或bgsave
save命令会导致主进程执行RDB,这个过程中其它所有命令都会被阻塞。只有在数据迁移时可能用到。
bgsave执行后会开启独立进程完成RDB,主进程可以持续处理用户请求,不受影响。
[3]使用flushall命令
这个命令也会产生dump.rdb文件,但里面是空的,没有意义
[4]服务器关闭
如果执行SHUTDOWN命令让Redis正常退出,那么此前Redis就会执行一次持久化保存。
- 相关配置
| 配置项 | 取值 | 作用 |
|---|---|---|
| save | "" | 禁用RDB机制 |
| dbfilename | 文件名,例如:dump.rdb | 设置RDB机制下,数据存储文件的文件名 |
| dir | Redis工作目录路径 | 指定存放持久化文件的目录的路径。注意:这里指定的必须是目录不能是文件名 |
| rdbcompression | no | 是否压缩 ,建议不开启,压缩也会消耗cpu,磁盘的话不值钱 通常默认为开启状态,如果设置为no,可以节省 CPU 运行时间,但会使存储的文件变大(巨大) |
| rdbchecksum | yes | 设置是否进行RDB文件格式校验,该校验过程在写文件和读文件过程均进行 通常默认为开启状态,如果设置为no,可以节约读写性过程约10%时间消耗,但是存储一定的数据损坏风险 |
bgsave
bgsave开始时会fork主进程得到子进程,子进程共享主进程的内存数据。完成fork后读取内存数据并写入 RDB 文件。
stop-writes-on-bgsave-error yes配置,后台存储过程中如果出现错误现象,是否停止保存操作,默认为开启状态。
fork采用的是copy-on-write技术:
- 当主进程执行读操作时,访问共享内存;
- 当主进程执行写操作时,则会拷贝一份数据,执行写操作。

AOF
AOF(append only file)持久化:以独立日志的方式记录每次写命令,重启时再重新执行AOF文件中命令 达到恢复数据的目的。与RDB相比可以简单描述为改记录数据为记录数据产生的过程
AOF的主要作用是解决了数据持久化的实时性,目前已经是Redis持久化的主流方式
根据配置文件中指定的策略,把生成数据的命令保存到硬盘上的文件中。
- AOF基本配置
| 配置项 | 取值 | 作用 |
|---|---|---|
| appendonly | yes | 启用AOF持久化机制 |
| no | 禁用AOF持久化机制[默认值] | |
| appendfilename | "文件名" | AOF持久化文件名 |
| dir | Redis工作目录路径 | 指定存放持久化文件的目录的路径。注意:这里指定的必须是目录不能是文件名 |
| appendfsync | always | 每一次数据修改后都将执行文件写入操作,缓慢但是最安全。 |
| everysec | 每秒执行一次写入操作。折中。 | |
| no | 由操作系统在适当的时候执行写入操作,最快。 |
- 三种策略对比:

- AOF重写
对比下面两组命令:
| AOF重写前 | AOF重写后 |
|---|---|
| set count 1 incr count incr count incr count | set count 4 |
两组命令执行后对于count来说最终的值是一致的,但是进行AOF重写后省略了中间过程,可以让AOF文件体积更小。而Redis会根据AOF文件的体积来决定是否进行AOF重写。参考的配置项如下:
| 配置项 | 含义 |
|---|---|
| auto-aof-rewrite-percentage 100 | 文件体积增大100%时执行AOF重写 |
| auto-aof-rewrite-min-size 64mb | 文件体积增长到64mb时执行AOF重写 |
此外还有手动触发bgrewriteaof
实际工作中不要进行频繁的AOF重写,因为CPU资源和硬盘资源二者之间肯定是CPU资源更加宝贵,所以不应该过多耗费CPU性能去节省硬盘空间。
重写规则:
进程内已超时的数据不再写入文件
忽略无效指令,重写时使用进程内数据直接生成
对同一数据的多条写命令合并为一条命令
作用:
降低磁盘占用量,提高磁盘利用率 提高持久化效率,降低持久化写时间,提高IO性能 降低数据恢复用时,提高数据恢复效
持久化文件损坏修复
Redis服务器启动时如果读取了损坏的持久化文件会导致启动失败,此时为了让Redis服务器能够正常启动,需要对损坏的持久化文件进行修复。这里以AOF文件为例介绍修复操作的步骤。
第一步:备份要修复的appendonly.aof文件
第二步:执行修复程序
/usr/local/redis/bin/redis-check-aof --fix /usr/local/redis/appendonly.aof
第三步:重启Redis
注意:所谓修复持久化文件仅仅是把损坏的部分去掉,而没法把受损的数据找回。
同步文件策略
AOF缓冲区同步文件策略,由参数appendfsync控制
系统调用write和fsync说明:
write操作会触发延迟写(delayed write)机制,Linux在内核提供页缓冲区用 来提高硬盘IO性能。write操作在写入系统缓冲区后直接返回。同步硬盘操作依 赖于系统调度机制,列如:缓冲区页空间写满或达到特定时间周期。同步文件之 前,如果此时系统故障宕机,缓冲区内数据将丢失。
fsync针对单个文件操作(比如AOF文件),做强制硬盘同步,fsync将阻塞知道 写入硬盘完成后返回,保证了数据持久化。
两种持久化方式对比
| RDB | AOF | |
|---|---|---|
| 持久化方式 | 定时对整个内存做快照 | 记录每一次执行的命令 |
| 数据完整性 | 不完整,两次备份之间会丢失 | 相对完整,取决于刷盘策略 |
| 文件大小 | 会有压缩,文件体积小 | 记录命令,文件体积很大 |
| 宕机恢复速度 | 很快 | 慢 |
| 数据恢复优先级 | 低,因为数据完整性不如AOF | 高,因为数据完整性更高 |
| 系统资源占用 | 高,大量CPU和内存消耗 | 低,主要是磁盘IO资源 但AOF重写时会占用大量CPU和内存资源 |
| 使用场景 | 可以容忍数分钟的数据丢失,追求更快的启动速度 | 对数据安全性要求较高常见 |
RDB
优势:
- RDB是一个紧凑压缩的二进制文件,存储效率较高
- RDB内部存储的是redis在某个时间点的数据快照,非常适合用于数据备份,全量复制等场景
- RDB恢复数据的速度要比AOF快很多
- 应用:服务器中每X小时执行bgsave备份,并将RDB文件拷贝到远程机器中,用于灾难恢复。
劣势:
- RDB方式无论是执行指令还是利用配置,无法做到实时持久化,具有较大的可能性丢失数据
- bgsave指令每次运行要执行fork操作创建子进程,要牺牲掉一些性能
- Redis的众多版本中未进行RDB文件格式的版本统一,有可能出现各版本服务之间数据格式无法兼容现象
- 如果业务场景很看重数据的持久性 (durability),那么不应该采用 RDB 持久化。譬如说,如果 Redis 每 5 分钟执行一次 RDB 持久化,要是 Redis 意外奔溃了,那么最多会丢失 5 分钟的数据。
AOF
优势
选择appendfsync always方式运行时理论上能够做到数据完整一致,但此时性能又不好。文件内容具备一定可读性,能够用来分析Redis工作情况。
劣势
持久化相同的数据,文件体积比RDB大,恢复速度比RDB慢。效率在同步写入时低于RDB,不同步写入时与RDB相同。
- RDB和AOF并存
Redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整
RDB的数据不实时,同时使用两者时服务器重启也只会找AOF文件。那要不要只使用AOF呢?作者建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份)、快速重启,而且不会有AOF可能潜在的bug,留着作为一个万一的手段。
- 使用建议
如果Redis仅仅作为缓存可以不使用任何持久化方式。
其他应用方式综合考虑性能和完整性、一致性要求。
RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留save 900 1这条规则。如果Enalbe AOF,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了。代价一是带来了持续的IO,二是AOF rewrite的最后将rewrite过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上。默认超过原大小100%大小时重写可以改到适当的数值。如果不开启AOF,仅靠Master-Slave Replication 实现高可用性能也不错。能省掉一大笔IO也减少了rewrite时带来的系统波动。代价是如果Master/Slave同时倒掉,会丢失十几分钟的数据,启动脚本也要比较两个Master/Slave中的RDB文件,载入较新的那个。新浪微博就选用了这种架构。
对数据非常敏感,建议使用默认的AOF持久化方案 AOF持久化策略使用everysecond,每秒钟fsync一次。该策略redis仍可以保持很好的处理性能,当出 现问题时,最多丢失0-1秒内的数据。 注意:由于AOF文件存储体积较大,且恢复速度较慢 数据呈现阶段有效性,建议使用RDB持久化方案 数据可以良好的做到阶段内无丢失(该阶段是开发者或运维人员手工维护的),且恢复速度较快,阶段 点数据恢复通常采用RDB方案 注意:利用RDB实现紧凑的数据持久化会使Redis降的很低,慎重总结: 综合比对 RDB与AOF的选择实际上是在做一种权衡,每种都有利有弊 如不能承受数分钟以内的数据丢失,对业务数据非常敏感,选用AOF 如能承受数分钟以内的数据丢失,且追求大数据集的恢复速度,选用RDB 灾难恢复选用RDB 双保险策略,同时开启 RDB 和 AOF,重启后,Redis优先使用 AOF 来恢复数据,降低丢失数据的量
3.2 虚拟内存
redis 的虚拟内存与操作系统虚拟内存不是一码事,但是思路和目的都是相同的。就是暂时把不经常访问的数据从内存交换到磁盘中,从而腾出宝贵的内存空间。对于redis 这样的内存数据库,内存总是不够用的。除了可以将数据分割到多个redis 服务器以外。另外的能够提高数据库容量的办法就是使用虚拟内存技术把那些不经常访问的数据交换到磁盘上。如果我们存储的数据总是有少部分数据被经常访问,大部分数据很少被访问,对于网站来说确实总是只有少量用户经常活跃。当少量数据被经常访问时,使用虚拟内存不但能提高单台redis 数据库服务器的容量,而且也不会对性能造成太多影响。
redis 没有使用操作系统提供的虚拟内存机制而是自己在用户态实现了自己的虚拟内存机制。
主要的理由有以下两点:
- 操作系统的虚拟内存是以4k/页为最小单位进行交换的。而redis 的大多数对象都远小于4k,所以一个操作系统页上可能有多个redis 对象。另外redis 的集合对象类型如list,set 可能存在于多个操作系统页上。最终可能造成只有10%的key 被经常访问,但是所有操作系统页都会被操作系统认为是活跃的,这样只有内存真正耗尽时操作系统才会进行页的交换。
- 相比操作系统的交换方式。redis 可以将被交换到磁盘的对象进行压缩,保存到磁盘的对象可以去除指针和对象元数据信息。一般压缩后的对象会比内存中的对象小10 倍。这样redis 的虚拟内存会比操作系统的虚拟内存少做很多IO 操作。
Redis 虚拟内存相关配置
vm-enabled yes #开启虚拟内存功能
vm-swap-file /tmp/redis.swap #交换出来value 保存的文件路径/tmp/redis.swap
vm-max-memory 268435456 #redis 使用的最大内存上限(256MB),超过上限后
redis 开始交换value 到磁盘swap 文件中。建议设置为系统空闲内存的60%-80%
vm-page-size 32 #每个redis 页的大小32 个字节
vm-pages 134217728 #最多在文件中使用多少个页,交换文件的大小=
(vm-page-size * vm-pages)4GB
vm-max-threads 8 #用于执行value 对象换入换出的工作线程数量。0表示不使用工作线程redis 的虚拟内存在设计上为了保证key 的查询速度,只会将value 交换到swap 文件中。如果是由于太多key 很小的value 造成的内存问题,那么redis 的虚拟内存并不能解决问题。和操作系统一样redis 也是按页来交换对象的。redis 规定同一个页只能保存一个对象。但是一个对象可以保存在多个页中。在redis 使用的内存没超过vm-max-memory 之前是不会交换任何value 的。当超过最大内存限制后,redis 会选择把较老的对象交换到swap文件中去。如果两个对象一样老会优先交换比较大的对象, 精确的交换计算公式swappability = age*log(size_in_memory)。对于vm-page-size 的设置应该根据自己应用将页的大小设置为可以容纳大多数对象的尺寸。太大了会浪费磁盘空间,太小了会造成交换文件出现过多碎片。对于交换文件中的每个页,redis 会在内存中用一个1bit 值来对应记录页的空闲状态。所以像上面配置中页数量(vm-pages 134217728 )会占用16MB 内存用来记录页的空闲状态。vm-max-threads 表示用做交换任务的工作线程数量。如果大于0 推荐设为服务器的cpu 的核心数。如果是0 则交换过程在主线程进行。
redis 虚拟内存工作方式简介
当vm-max-threads 设为0 时(阻塞方式)
换出
主线程定期检查发现内存超出最大上限后,会直接以阻塞的方式,将选中的对象保存到swap文件中,并释放对象占用的内存空间,此过程会一直重复直到下面条件满足 1.内存使用降到最大限制以下 2.swap 文件满了。 3.几乎全部的对象都被交换到磁盘了
换入
当有客户端请求已经被换出的value 时,主线程会以阻塞的方式从swap 文件中加载对应的value 对象,加载时此时会阻塞所有客户端。然后处理该客户端的请求
当vm-max-threads 大于0 时(工作线程方式)
换出
当主线程检测到使用内存超过最大上限,会将选中要交换的对象信息放到一个队列中交给工作线程后台处理,主线程会继续处理客户端请求。
换入
如果有客户端请求的key 已经被换出了,主线程会先阻塞发出命令的客户端,然后将加载对象的信息放到一个队列中,让工作线程去加载。加载完毕后工作线程通知主线程。主线程再执行客户端的命令。这种方式只阻塞请求的value 是已经被换出key 的客户端。
总的来说阻塞方式的性能会好一些,因为不需要线程同步、创建线程和恢复被阻塞的客户端等开销。但是也相应的牺牲了响应性。工作线程方式主线程不会阻塞在磁盘IO 上,所以响应性更好。如果我们的应用不太经常发生换入换出,而且也不太在意有点延迟的话推荐使用阻塞方式。
3.3 事务
Redis会将一个事务中的所有命令序列化,然后按顺序执行。但是Redis事务不支持回滚操作,命令运行出错后,正确的命令会继续执行。
Redis事务其实是把一系列Redis命令放入队列,然后批量执行,执行过程中不会有其它事务来打断。不过与关系型数据库的事务不同,Redis事务不支持回滚操作,事务中某个命令执行失败,其它命令依然会执行。
为了弥补不能回滚的问题,Redis会在事务入队时就检查命令,如果命令异常则会放弃整个事务。
因此,只要程序员编程是正确的,理论上说Redis会正确执行所有事务,无需回滚。
如果事务执行一半的时候Redis宕机时。
Redis有持久化机制,因为可靠性问题,我们一般使用AOF持久化。事务的所有命令也会写入AOF文件,但是如果在执行EXEC命令之前,Redis已经宕机,则AOF文件中事务不完整。使用 redis-check-aof 程序可以移除 AOF 文件中不完整事务的信息,确保服务器可以顺利启动。
Redis事务控制的相关命令
| 命令名 | 作用 |
|---|---|
| MULTI | 用于开启一个事务,它总是返回OK。表示开始收集命令,后面所有命令都不是马上执行,而是加入到一个队列中。 |
| EXEC | 执行MULTI后面命令队列中的所有命令。返回所有命令的返回值。事务执行过程中,Redis不会执行其它事务的命令。 |
| DISCARD | 放弃执行队列中的命令。 并且客户端会从事务状态中退出 |
| WATCH | Redis的乐观锁机制,“观察“、”监控“一个KEY,在当前队列外的其他命令操作这个KEY时,放弃执行自己队列的命令 |
| UNWATCH | 放弃监控一个KEY |
- 执行 EXEC 之前,入队的命令可能会出错。比如说,命令可能会产生语法错误(参数数量错误,参数名错误,等等),或者其他更严重的错误,比如内存不足(如果服务器使用
maxmemory设置了最大内存限制的话)。- Redis 2.6.5 开始,服务器会对命令入队失败的情况进行记录,并在客户端调用 EXEC 命令时,拒绝执行并自动放弃这个事务。
- 命令可能在 EXEC 调用之后失败。举个例子,事务中的命令可能处理了错误类型的键,比如将列表命令用在了字符串键上面,诸如此类。
- 即使事务中有某个/某些命令在执行时产生了错误, 事务中的其他命令仍然会继续执行,不会回滚。
命令队列执行失败的两种情况
- 加入队列时失败
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set age 20
QUEUED
127.0.0.1:6379> incr age
QUEUED
127.0.0.1:6379> incr age www
(error) ERR wrong number of arguments for 'incr' command
127.0.0.1:6379> exec
(error) EXECABORT Transaction discarded because of previous errors.遇到了入队时即可检测到的错误,整个队列都不会执行。
- 执行队列时失败
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set age 30
QUEUED
127.0.0.1:6379> incrby age 5
QUEUED
127.0.0.1:6379> incrby age 5
QUEUED
127.0.0.1:6379> incrby age ww
QUEUED
127.0.0.1:6379> incrby age 5
QUEUED
127.0.0.1:6379> EXEC
1) OK
2) (integer) 35
3) (integer) 40
4) (error) ERR value is not an integer or out of range
5) (integer) 45
127.0.0.1:6379> get age
"45"错误在入队时检测不出来,整个队列执行时有错的命令执行失败,但是其他命令并没有回滚。
Redis为什么不支持回滚
官方解释如下:
如果你有使用关系式数据库的经验, 那么 “Redis 在事务失败时不进行回滚,而是继续执行余下的命令”这种做法可能会让你觉得有点奇怪。以下是这种做法的优点:
1.Redis 命令只会因为错误的语法而失败(并且这些问题不能在入队时发现),或是命令用在了错误类型的键上面:这也就是说,从实用性的角度来说,失败的命令是由编程错误造成的,而这些错误应该在开发的过程中被发现,而不应该出现在生产环境中。
2.因为不需要对回滚进行支持,所以 Redis 的内部可以保持简单且快速。
有种观点认为 Redis 处理事务的做法会产生 bug , 然而需要注意的是, 在通常情况下, 回滚并不能解决编程错误带来的问题。 举个例子, 如果你本来想通过 INCR 命令将键的值加上 1 , 却不小心加上了 2 , 又或者对错误类型的键执行了 INCR , 回滚是没有办法处理这些情况的。锁:
# 对 key 添加监视锁,在执行exec前如果key发生了变化,终止事务执行
watch key1 [key2……]
# 取消对所有 key 的监视
unwatch3.4 key过期策略
为什么需要内存回收?
- 1、在Redis中,set指令可以指定key的过期时间,当过期时间到达以后,key就失效了;
- 2、Redis是基于内存操作的,所有的数据都是保存在内存中,一台机器的内存是有限且很宝贵的。
基于以上两点,为了保证Redis能继续提供可靠的服务,Redis需要一种机制清理掉不常用的、无效的、多余的数据,失效后的数据需要及时清理,这就需要内存回收了。
Redis的内存回收主要分为过期删除策略和内存淘汰策略两部分。
过期删除策略
删除达到过期时间的key。
- 1)定时删除
对于每一个设置了过期时间的key都会创建一个定时器,一旦到达过期时间就立即删除。该策略可以立即清除过期的数据,对内存较友好,但是缺点是占用了大量的CPU资源去处理过期的数据,会影响Redis的吞吐量和响应时间。
- 2)惰性删除
当访问一个key时,才判断该key是否过期,过期则删除。该策略能最大限度地节省CPU资源,但是对内存却十分不友好。有一种极端的情况是可能出现大量的过期key没有被再次访问,因此不会被清除,导致占用了大量的内存。
在计算机科学中,懒惰删除(英文:lazy deletion)指的是从一个散列表(也称哈希表)中删除元素的一种方法。在这个方法中,删除仅仅是指标记一个元素被删除,而不是整个清除它。被删除的位点在插入时被当作空元素,在搜索之时被当作已占据。
- 3)定期删除
每隔一段时间,扫描Redis中过期key字典,并清除部分过期的key。该策略是前两者的一个折中方案,还可以通过调整定时扫描的时间间隔和每次扫描的限定耗时,在不同情况下使得CPU和内存资源达到最优的平衡效果。
在Redis中,同时使用了定期删除和惰性删除。不过Redis定期删除采用的是随机抽取的方式删除部分Key,因此不能保证过期key 100%的删除。
Redis结合了定期删除和惰性删除,基本上能很好的处理过期数据的清理,但是实际上还是有点问题的,如果过期key较多,定期删除漏掉了一部分,而且也没有及时去查,即没有走惰性删除,那么就会有大量的过期key堆积在内存中,导致redis内存耗尽,当内存耗尽之后,有新的key到来会发生什么事呢?是直接抛弃还是其他措施呢?有什么办法可以接受更多的key?
内存淘汰策略
Redis的内存淘汰策略,是指内存达到maxmemory极限时,使用某种算法来决定清理掉哪些数据,以保证新数据的存入。
相关配置
| maxmemory | 最大可使用内存 占用物理内存的比例,默认值为0,表示不限制。 |
|---|---|
| maxmemory-samples | 每次选取待删除数据的个数 选取数据时并不会全库扫描,导致严重的性能消耗,降低读写性能。因此采用随机获取数据的方式作为待检测删除数据 |
| maxmemory-policy | 删除策略 达到最大内存后的,对被挑选出来的数据进行删除的策略 |
Redis的内存淘汰机制包括:
- noeviction: 当内存不足以容纳新写入数据时,新写入操作会报错。
- allkeys-lru:当内存不足以容纳新写入数据时,在键空间(
server.db[i].dict)中,移除最近最少使用的 key(这个是最常用的)。 - allkeys-random:当内存不足以容纳新写入数据时,在键空间(
server.db[i].dict)中,随机移除某个 key。 - volatile-lru:当内存不足以容纳新写入数据时,在设置了过期时间的键空间(
server.db[i].expires)中,移除最近最少使用的 key。 - allkeys-lfu:挑选最近使用次数最少的数据淘汰
- volatile-random:当内存不足以容纳新写入数据时,在设置了过期时间的键空间(
server.db[i].expires)中,随机移除某个 key。 - volatile-ttl:当内存不足以容纳新写入数据时,在设置了过期时间的键空间(
server.db[i].expires)中,有更早过期时间的 key 优先移除。
在配置文件中,通过maxmemory-policy可以配置要使用哪一个淘汰机制。
什么时候会进行淘汰?
Redis会在每一次处理命令的时候(processCommand函数调用freeMemoryIfNeeded)判断当前redis是否达到了内存的最大限制,如果达到限制,则使用对应的算法去处理需要删除的key。
在淘汰key时,Redis默认最常用的是LRU算法(Latest Recently Used)。Redis通过在每一个redisObject保存lru属性来保存key最近的访问时间,在实现LRU算法时直接读取key的lru属性。
具体实现时,Redis遍历每一个db,从每一个db中随机抽取一批样本key,默认是3个key,再从这3个key中,删除最近最少使用的key。
总结
Redis过期策略包含定期删除和惰性删除两部分。定期删除是在Redis内部有一个定时任务,会定期删除一些过期的key。惰性删除是当用户查询某个Key时,会检查这个Key是否已经过期,如果没过期则返回用户,如果过期则删除。
但是这两个策略都无法保证过期key一定删除,漏网之鱼越来越多,还可能导致内存溢出。当发生内存不足问题时,Redis还会做内存回收。内存回收采用LRU策略,就是最近最少使用。其原理就是记录每个Key的最近使用时间,内存回收时,随机抽取一些Key,比较其使用时间,把最老的几个删除。
Redis的逻辑是:最近使用过的,很可能再次被使用
3.5 Redis的缓存击穿、缓存雪崩、缓存穿透
1)缓存穿透
参考资料:
什么是缓存穿透
- 正常情况下,我们去查询数据都是存在。那么请求去查询一条压根儿数据库中根本就不存在的数据,也就是缓存和数据库都查询不到这条数据,但是请求每次都会打到数据库上面去。这种查询不存在数据的现象我们称为缓存穿透。
穿透带来的问题
- 试想一下,如果有黑客会对你的系统进行攻击,拿一个不存在的id 去查询数据,会产生大量的请求到数据库去查询。可能会导致你的数据库由于压力过大而宕掉。
解决办法
- 缓存空值:之所以会发生穿透,就是因为缓存中没有存储这些空数据的key。从而导致每次查询都到数据库去了。那么我们就可以为这些key对应的值设置为null 丢到缓存里面去。后面再出现查询这个key 的请求的时候,直接返回null 。这样,就不用在到数据库中去走一圈了,但是别忘了设置过期时间。
- BloomFilter(布隆过滤):将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据会被 这个bitmap拦截掉,从而避免了对底层存储系统的查询压力。在缓存之前在加一层 BloomFilter ,在查询的时候先去 BloomFilter 去查询 key 是否存在,如果不存在就直接返回,存在再走查缓存 -> 查 DB。
总结
缓存穿透有两种解决方案:其一是把不存在的key设置null值到缓存中。其二是使用布隆过滤器,在查询缓存前先通过布隆过滤器判断key是否存在,存在再去查询缓存。
设置null值可能被恶意针对,攻击者使用大量不存在的不重复key ,那么方案一就会缓存大量不存在key数据。此时我们还可以对Key规定格式模板,然后对不存在的key做正则规范匹配,如果完全不符合就不用存null值到redis,而是直接返回错误。
2)缓存击穿
- 什么是缓存击穿?
key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。
当这个key在失效的瞬间,redis查询失败,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。
- 解决方案:
- 使用互斥锁(mutex key):mutex,就是互斥。简单地来说,就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db,而是先使用Redis的SETNX去set一个互斥key,当操作返回成功时,再进行load db的操作并回设缓存;否则,就重试整个get缓存的方法。SETNX,是「SET if Not eXists」的缩写,也就是只有不存在的时候才设置,可以利用它来实现互斥的效果。
- 软过期:也就是逻辑过期,不使用redis提供的过期时间,而是业务层在数据中存储过期时间信息。查询时由业务程序判断是否过期,如果数据即将过期时,将缓存的时效延长,程序可以派遣一个线程去数据库中获取最新的数据,其他线程这时看到延长了的过期时间,就会继续使用旧数据,等派遣的线程获取最新数据后再更新缓存。
推荐使用互斥锁,因为软过期会有业务逻辑侵入和额外的判断。
总结
缓存击穿主要担心的是某个Key过期,更新缓存时引起对数据库的突发高并发访问。因此我们可以在更新缓存时采用互斥锁控制,只允许一个线程去更新缓存,其它线程等待并重新读取缓存。例如Redis的setnx命令就能实现互斥效果。
3)缓存雪崩
相关资料:
缓存雪崩,是指在某一个时间段,缓存集中过期失效。对这批数据的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。
解决方案:
- 数据分类分批处理:采取不同分类数据,缓存不同周期
- 相同分类数据:采用固定时长加随机数方式设置缓存
- 热点数据缓存时间长一些,冷门数据缓存时间短一些
- 避免redis节点宕机引起雪崩,搭建主从集群,保证高可用
- 构建多级缓存架构 Nginx缓存+redis缓存+ehcache缓存
- 限流、降级 短时间范围内牺牲一些客户体验,限制一部分请求访问,降低应用服务器压力,待业务低速运转后再逐步放开访
4)缓存预热
缓存预热就是系统启动前,提前将相关的缓存数据直接加载到缓存系统。避免在用户请求的时候,先查询数据库,然后再将数据缓 存的问题!用户直接查询事先被预热的缓存数据
前置准备工作:
- 日常例行统计数据访问记录,统计访问频度较高的热点数据
- 利用LRU数据删除策略,构建数据留存队列 例如:storm与kafka配合
准备工作:
- 将统计结果中的数据分类,根据级别,redis优先加载级别较高的热点数据
- 利用分布式多服务器同时进行数据读取,提速数据加载过程
- 热点数据主从同时预热
实施:
- 使用脚本程序固定触发数据预热过程
- 如果条件允许,使用了CDN(内容分发网络),效果会更好
总结
解决缓存雪崩问题的关键是让缓存Key的过期时间分散。因此我们可以把数据按照业务分类,然后设置不同过期时间。相同业务类型的key,设置固定时长加随机数。尽可能保证每个Key的过期时间都不相同。
另外,Redis宕机也可能导致缓存雪崩,因此我们还要搭建Redis主从集群及哨兵监控,保证Redis的高可用。
3.6 缓存冷热数据分离
Redis使用的是内存存储,当需要海量数据存储时,成本非常高。
经过调研发现,当前主流DDR3内存和主流SATA SSD的单位成本价格差距大概在20倍左右,为了优化redis机器综合成本,我们考虑实现基于热度统计 的数据分级存储及数据在RAM/FLASH之间的动态交换,从而大幅度降低成本,达到性能与成本的高平衡。
基本思路:基于key访问次数(LFU)的热度统计算法识别出热点数据,并将热点数据保留在redis中,对于无访问/访问次数少的数据则转存到SSD上,如果SSD上的key再次变热,则重新将其加载到redis内存中。
目前流行的高性能磁盘存储,并且遵循Redis协议的方案包括:
- SSDB:http://ssdb.io/zh_cn/
- RocksDB:https://rocksdb.org.cn/
因此,我们就需要在应用程序与缓存服务之间引入代理,实现Redis和SSD之间的切换
这样的代理方案阿里云提供的就有。当然也有一些开源方案,例如:https://github.com/JingchengLi/swapdb
3.7 分布式锁
分布式锁要满足的条件:
- 多进程互斥:同一时刻,只有一个进程可以获取锁
- 保证锁可以释放:任务结束或出现异常,锁一定要释放,避免死锁
- 阻塞锁(可选):获取锁失败时可否重试
- 重入锁(可选):获取锁的代码递归调用时,依然可以获取锁
1)最基本的分布式锁:
利用Redis的setnx命令,这个命令的特征时如果多次执行,只有第一次执行会成功,可以实现互斥的效果。但是为了保证服务宕机时也可以释放锁,需要利用expire命令给锁设置一个有效期
setnx lock thread-01 # 尝试获取锁
expire lock 10 # 设置有效期问题1:如果expire之前服务宕机怎么办?
要保证setnx和expire命令的原子性。redis的set命令可以满足:
set key value [NX] [EX time]需要添加nx和ex的选项:
- NX:与setnx一致,第一次执行成功
- EX:设置过期时间
问题2:释放锁的时候,如果自己的锁已经过期了,此时会出现安全漏洞,如何解决?
在锁中存储当前进程和线程标识,释放锁时对锁的标识判断,如果是自己的则删除,不是则放弃操作。
但是这两步操作要保证原子性,需要通过Lua脚本来实现。
if redis.call("get",KEYS[1]) == ARGV[1] then
redis.call("del",KEYS[1])
end2)可重入分布式锁
如果有重入的需求,则除了在锁中记录进程标识,还要记录重试次数,流程如下:

下面我们假设锁的key为“lock”,hashKey是当前线程的id:“threadId”,锁自动释放时间假设为20
获取锁的步骤:
- 1、判断lock是否存在
EXISTS lock- 存在,说明有人获取锁了,下面判断是不是自己的锁
- 判断当前线程id作为hashKey是否存在:
HEXISTS lock threadId- 不存在,说明锁已经有了,且不是自己获取的,锁获取失败,end
- 存在,说明是自己获取的锁,重入次数+1:
HINCRBY lock threadId 1,去到步骤3
- 判断当前线程id作为hashKey是否存在:
- 2、不存在,说明可以获取锁,
HSET key threadId 1 - 3、设置锁自动释放时间,
EXPIRE lock 20
- 存在,说明有人获取锁了,下面判断是不是自己的锁
释放锁的步骤:
- 1、判断当前线程id作为hashKey是否存在:
HEXISTS lock threadId- 不存在,说明锁已经失效,不用管了
- 存在,说明锁还在,重入次数减1:
HINCRBY lock threadId -1,获取新的重入次数
- 2、判断重入次数是否为0:
- 为0,说明锁全部释放,删除key:
DEL lock - 大于0,说明锁还在使用,重置有效时间:
EXPIRE lock 20
- 为0,说明锁全部释放,删除key:
对应的Lua脚本如下:
首先是获取锁:
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
if(redis.call('exists', key) == 0) then -- 判断是否存在
redis.call('hset', key, threadId, '1'); -- 不存在, 获取锁
redis.call('expire', key, releaseTime); -- 设置有效期
return 1; -- 返回结果
end;
if(redis.call('hexists', key, threadId) == 1) then -- 锁已经存在,判断threadId是否是自己
redis.call('hincrby', key, threadId, '1'); -- 不存在, 获取锁,重入次数+1
redis.call('expire', key, releaseTime); -- 设置有效期
return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败然后是释放锁:
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
if (redis.call('HEXISTS', key, threadId) == 0) then -- 判断当前锁是否还是被自己持有
return nil; -- 如果已经不是自己,则直接返回
end;
local count = redis.call('HINCRBY', key, threadId, -1); -- 是自己的锁,则重入次数-1
if (count > 0) then -- 判断是否重入次数是否已经为0
redis.call('EXPIRE', key, releaseTime); -- 大于0说明不能释放锁,重置有效期然后返回
return nil;
else
redis.call('DEL', key); -- 等于0说明可以释放锁,直接删除
return nil;
end;3)高可用的锁
问题:redis分布式锁依赖与redis,如果redis宕机则锁失效。如何解决?
此时大多数同学会回答说:搭建主从集群,做数据备份。
这样就进入了陷阱,因为下一个问题就来了:
问题:如果搭建主从集群做数据备份时,进程A获取锁,master还没有把数据备份到slave,master宕机,slave升级为master,此时原来锁失效,其它进程也可以获取锁,出现安全问题。如何解决?
关于这个问题,Redis官网给出了解决方案,使用RedLock思路可以解决:
在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。之前我们已经描述了在Redis单实例下怎么安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在这个样例中,我们假设有5个Redis master节点,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。
为了取到锁,客户端应该执行以下操作:
- 获取当前Unix时间,以毫秒为单位。
- 依次尝试从N个实例,使用相同的key和随机值获取锁。在步骤2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。
- 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
- 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
- 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。
3.8 如何实现数据库与缓存数据一致
- 本地缓存同步:当前微服务的数据库数据与缓存数据同步,可以直接在数据库修改时加入对Redis的修改逻辑,保证一致。
- 跨服务缓存同步:服务A调用了服务B,并对查询结果缓存。服务B数据库修改,可以通过MQ通知服务A,服务A修改Redis缓存数据
- 通用方案:使用Canal框架,伪装成MySQL的salve节点,监听MySQL的binLog变化,然后修改Redis缓存数据
3.9 监控
监控指标
性能指标:Performance
latency Redis响应一个请求的时间 instantaneous_ops_per_sec 平均每秒处理请求总数 hit rate(calculated) 缓存命中率(计算出来的) 内存指标:Memory
used_memory 已使用内存 mem_fragmentation_ratio 内存碎片率 evicted_keys 由于最大内存限制被移除的key的数量 blocked_clients 由于BLPOP,BRPOP,OR BRPOPLPUSH而被阻塞的客户端 基本活动指标:Basic activity
connected_clients 客户端连接数 connected_slaves Slave数量 master_last_io_seconds_ago 最近一次主从交互之后的秒数 keyspace 数据库中的key值总数 持久性指标:Persistence
rdb_last_save_time 最后一次持久化保存到磁盘的时间戳 rdb_changes_since_last_save 自最后一次持久化依赖数据库的更改数 错误指标:Erro
rejected_connections 由于达到maxclient限制而被拒绝的连接数 keyspace_misses Key值查找失败(没有命中)的次数 master_link_down_since_seconds 主从断开的持续时间(以秒为单位)
监控方式
工具
Cloud Insight Redis、Prometheus 、Redis-stat 、 Redis-faina 、RedisLive 、 zabbix
命令
benchmark、redis cli ( monitor 、showlog)
benchmark
# 命令
# -h 指定服务器主机 127.0.0.1
# -p 服务器端口 6379
# -s 服务器socket
# -c 并发连接数 50
# -n 请求数 10000
# -d 以字节的形式指定set/get值得数据大小 2
# -k 1=keep alive 0=reconnect 1
# -r set/get/incr使用随机key,sadd使用随机值
# -P 通过管道传输<numreq>请求 1
# -q 强制推出redis,仅显示query/sec值
# --csv 以csv格式输出
# -l 生成循环,永久执行测试
# -t 仅运行以逗号分隔得测试命令列表
# -I Idle模式,进打开N个idle连接并等待
redis-benchmark [-h ] [-p ] [-c ] [-n <requests]> [-k ]
# 范例1 说明:50个连接,10000次请求对应的性能
redis-benchmark
# 范例2 说明:100个连接,5000次请求对应的性能
redis-benchmark -c 100 -n 5000monitor
#打印服务器调试信息
monitorshowlong
# get :获取慢查询日志
# len :获取慢查询日志条目数
# reset :重置慢查询日志
showlong [operator]
# 相关配置
slowlog-log-slower-than 1000 #设置慢查询的时间下线,单位:微妙
slowlog-max-len 100 #设置慢查询命令对应的日志显示长度,单位:命令数3.10 订阅
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
优点:•采用发布订阅模型,支持多生产、多消费
缺点:不支持数据持久化、无法避免消息丢失、消息堆积有上限,超出时数据丢失
SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
3.11 消息队列Stream
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
特点:消息可回溯、一个消息可以被多个消费者读取、可以阻塞读取、有消息漏读的风险。
Redis Stream | 菜鸟教程 (runoob.com)
消费者组:将多个消费者划分到一个组中,监听同一个队列。实现消息分流,消息标示,消息确认。
特点:消息可回溯、可以多消费者争抢消息,加快消费速度、可以阻塞读取、没有消息漏读的风险、有消息确认机制,保证消息至少被消费一次
组相关操作
# 创建组
# key:队列名称
# groupName:消费者组名称
# ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
# MKSTREAM:队列不存在时自动创建队列
XGROUP CREATE key groupName ID [MKSTREAM]
# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername读取数据
# 从消费者组读取消息
# group:消费组名称
# consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
# count:本次查询的最大数量
# BLOCK milliseconds:当没有消息时最长等待时间
# NOACK:无需手动ACK,获取到消息后自动确认
# STREAMS key:指定队列名称
# ID:获取消息的起始ID:
# ">":从下一个未消费的消息开始
# 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]3.12 消息队列对比
| List | PubSub | Stream | |
|---|---|---|---|
| 消息持久化 | 支持 | 不支持 | 支持 |
| 阻塞读取 | 支持 | 支持 | 支持 |
| 消息堆积处理 | 受限于内存空间,可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于队列长度,可以利用消费者组提高消费速度,减少堆积 |
| 消息确认机制 | 不支持 | 不支持 | 支持 |
| 消息回溯 | 不支持 | 不支持 | 支持 |
3.13 慢查询
Redis慢查询可以记录查询时长超过一定时间的查询,发人员和运维人员可以据此定位慢操作。
# 查询配置
config get slowlog*
# slowlog-max-len 慢查询的日志大小,当满了之后,旧日志会被删除,建议设计1000以上
# slowlog-log-slower-than 设置时间,单位是μm,当一条命令执行超过这个时间就会被记录到慢查询日志中
# 查询日志内容
# slowlog get [n] 获取前n条慢查询日志,不指定长度获取全部
#uid 日志唯一编号
#timestamp 开始执行这条命令的时间戳
#time len 执行时长
# command 执行的命令和参数
#client IP and port redis客戶端的地址和端口
# clientname redis客戶端名字
# 慢查询总条数
slowlog len
# 清空慢查询日志
slowlog reset3.14 redis性能测试
# 100个并发连接,100000个请求,检测host为127.0.0.1端口为6379的redis服务器性能
./redis-benchmark -h 127.0.0.1 -p 6379 -c 100 -n 100000
# 只测试某些操作的性能
./redis-benchmark -h 127.0.0.1 -p 6379 -t set,lpush -c 100 -n 100000 -q3.15 集群方式
Redis集群可以分为主从集群和分片集群两类。
主从集群一般一主多从,主库用来写数据,从库用来读数据。结合哨兵,可以再主库宕机时从新选主,目的是保证Redis的高可用。
分片集群是数据分片,我们会让多个Redis节点组成集群,并将16383个插槽分到不同的节点上。存储数据时利用对key做hash运算,得到插槽值后存储到对应的节点即可。因为存储数据面向的是插槽而非节点本身,因此可以做到集群动态伸缩。目的是让Redis能存储更多数据。
)主从集群
主从集群,也是读写分离集群。一般都是一主多从方式。
Redis 的复制(replication)功能允许用户根据一个 Redis 服务器来创建任意多个该服务器的复制品,其中被复制的服务器为主服务器(master),而通过复制创建出来的服务器复制品则为从服务器(slave)。
只要主从服务器之间的网络连接正常,主从服务器两者会具有相同的数据,主服务器就会一直将发生在自己身上的数据更新同步 给从服务器,从而一直保证主从服务器的数据相同。
- 写数据时只能通过主节点完成
- 读数据可以从任何节点完成
- 如果配置了
哨兵节点,当master宕机时,哨兵会从salve节点选出一个新的主。
2)分片集群
主从集群中,每个节点都要保存所有信息,容易形成木桶效应。并且当数据量较大时,单个机器无法满足需求。此时我们就要使用分片集群了。
集群特征:
每个节点都保存不同数据
所有的redis节点彼此互联(PING-PONG机制),内部使用二进制协议优化传输速度和带宽.
节点的fail是通过集群中超过半数的节点检测失效时才生效.
客户端与redis节点直连,不需要中间proxy层连接集群中任何一个可用节点都可以访问到数据
redis-cluster把所有的物理节点映射到[0-16383]slot(插槽)上,实现动态伸缩
为了保证Redis中每个节点的高可用,我们还可以给每个节点创建replication(slave节点)
3.16 集群主从同步、读写分离

过程介绍
当设置好slave 服务器后,slave 会建立和master 的连接,然后发送sync 命令。无论是第一次同步建立的连接还是连接断开后的重新连接,master 都会启动一个后台进程,将数据库快照保存到文件中,同时master 主进程会开始收集新的写命令并缓存起来。后台进程完成写文件后,master 就发送文件给slave,slave 将文件保存到磁盘上,然后加载到内存恢复数据库快照到slave 上。接着master 就会把缓存的命令转发给slave。而且后续master 收到的写命令都会通过开始建立的连接发送给slave。从master 到slave 的同步数据的命令和从客户端发送的命令使用相同的协议格式。当master 和slave 的连接断开时slave 可以自动重新建立连接。如果master 同时收到多个slave 发来的同步连接命令,只会启动一个进程来写数据库镜像,然后发送给所有slave。
可分为三个阶段
阶段一:建立连接阶段
建立slave到master的连接,使master能够识别slave,并保存slave端口号
阶段二:数据同步阶段工作流程
在slave初次连接master后,复制master中的所有数据到slave , 将slave的数据库状态更新成master当前的数据库状态
阶段三:命令传播阶段
当master数据库状态被修改后,导致主从服务器数据库状态不一致,此时需要让主从数据同步到一致的 状态,同步的动作称为命令传播
master将接收到的数据变更命令发送给slave,slave接收命令后执行命令
复制缓冲区
概念:复制缓冲区,又名复制积压缓冲区,是一个先进先出(FIFO)的队列,用于存储服务器执行过的命 令,每次传播命令,master都会将传播的命令记录下来,并存储在复制缓冲区 复制缓冲区默认数据存储空间大小是1M,由于存储空间大小是固定的,当入队元素的数量大于队 列长度时,最先入队的元素会被弹出,而新元素会被放入队列
由来:每台服务器启动时,如果开启有AOF或被连接成为master节点,即创建复制缓冲区
作用:用于保存master收到的所有指令(仅影响数据变更的指令,例如set,select)
数据来源:当master接收到主客户端的指令时,除了将指令执行,会将该指令存储到缓冲区中
读写分离的好处:
- 性能优化:主服务器专注于写操作,可以用更适合写入数据的模式工作;同样,从服务器专注于读操作,可以用更适合读取数据的模式工作。
- 强化数据安全,避免单点故障:由于数据同步机制的存在,各个服务器之间数据保持一致,所以其中某个服务器宕机不会导致数据丢失或无法访问。从这个角度说参与主从复制的Redis服务器构成了一个。
1. 全量同步
主从第一次建立连接时,会执行全量同步,将master节点的所有数据都拷贝给slave节点,流程:

这里有一个问题,master如何得知salve是第一次来连接呢??
有几个概念,可以作为判断依据:
- Replication Id:简称replid,是数据集的标记,id一致则说明是同一数据集。每一个master都有唯一的replid,slave则会继承master节点的replid
- offset:偏移量,随着记录在repl_baklog中的数据增多而逐渐增大。slave完成同步时也会记录当前同步的offset。如果slave的offset小于master的offset,说明slave数据落后于master,需要更新。
因此slave做数据同步,必须向master声明自己的replication id 和offset,master才可以判断到底需要同步哪些数据。
因为slave原本也是一个master,有自己的replid和offset,当第一次变成slave,与master建立连接时,发送的replid和offset是自己的replid和offset。
master判断发现slave发送来的replid与自己的不一致,说明这是一个全新的slave,就知道要做全量同步了。
master会将自己的replid和offset都发送给这个slave,slave保存这些信息。以后slave的replid就与master一致了。
因此,master判断一个节点是否是第一次同步的依据,就是看replid是否一致。
如图:

完整流程描述:
- slave节点请求增量同步
- master节点判断replid,发现不一致,拒绝增量同步
- master将完整内存数据生成RDB,发送RDB到slave
- slave清空本地数据,加载master的RDB
- master将RDB期间的命令记录在repl_baklog,并持续将log中的命令发送给slave
- slave执行接收到的命令,保持与master之间的同步
2.增量同步
全量同步需要先做RDB,然后将RDB文件通过网络传输个slave,成本太高了。因此除了第一次做全量同步,其它大多数时候slave与master都是做增量同步。
什么是增量同步?就是只更新slave与master存在差异的部分数据。如图:

那么master怎么知道slave与自己的数据差异在哪里呢?
3.repl_backlog原理
master怎么知道slave与自己的数据差异在哪里呢?
这就要说到全量同步时的repl_baklog文件了。
这个文件是一个固定大小的数组,只不过数组是环形,也就是说角标到达数组末尾后,会再次从0开始读写,这样数组头部的数据就会被覆盖。
repl_baklog中会记录Redis处理过的命令日志及offset,包括master当前的offset,和slave已经拷贝到的offset:

slave与master的offset之间的差异,就是salve需要增量拷贝的数据了。
随着不断有数据写入,master的offset逐渐变大,slave也不断的拷贝,追赶master的offset:

直到数组被填满:

此时,如果有新的数据写入,就会覆盖数组中的旧数据。不过,旧的数据只要是绿色的,说明是已经被同步到slave的数据,即便被覆盖了也没什么影响。因为未同步的仅仅是红色部分。
但是,如果slave出现网络阻塞,导致master的offset远远超过了slave的offset:

如果master继续写入新数据,其offset就会覆盖旧的数据,直到将slave现在的offset也覆盖:

棕色框中的红色部分,就是尚未同步,但是却已经被覆盖的数据。此时如果slave恢复,需要同步,却发现自己的offset都没有了,无法完成增量同步了。只能做全量同步。

4.主从同步优化
主从同步可以保证主从数据的一致性,非常重要。
可以从以下几个方面来优化Redis主从就集群:
- 在master中配置repl-diskless-sync yes启用无磁盘复制,避免全量同步时的磁盘IO。
- Redis单节点上的内存占用不要太大,减少RDB导致的过多磁盘IO
- 适当提高repl_baklog的大小,发现slave宕机时尽快实现故障恢复,尽可能避免全量同步
- 限制一个master上的slave节点数量,如果实在是太多slave,则可以采用主-从-从链式结构,减少master压力
主从从架构图:

5. 主从同步常见问题
1)频繁全量复制
伴随着系统的运行,master的数据量会越来越大,一旦master重启,runid将发生变化,会导致全部slave的 全量复制操作
内部优化调整方案:
- master内部创建master_replid变量,使用runid相同的策略生成,长度41位,并发送给所有slave
- 在master关闭时执行命令 shutdown save,进行RDB持久化,将runid与offset保存到RDB文件中
- repl-id repl-offset 通过redis-check-rdb命令可以查看该信息
- master重启后加载RDB文件,恢复数据 重启后,将RDB文件中保存的repl-id与repl-offset加载到内存中
- master_repl_id = repl master_repl_offset = repl-offset
- 通过info命令可以查看该信息
作用: 本机保存上次runid,重启后恢复该值,使所有slave认为还是之前的master
2)频繁的全量复制(2)
问题现象 : 网络环境不佳,出现网络中断,slave不提供服务
问题原因 :复制缓冲区过小,断网后slave的offset越界,触发全量复制
最终结果 :slave反复进行全量复制
解决方案 : 修改复制缓冲区大小 repl-backlog-size
建议设置如下:
- 测算从master到slave的重连平均时长second
- 获取master平均每秒产生写命令数据总量
write_size_per_second - 最优复制缓冲区空间 =
2 * second * write_size_per_second
3)频繁的网络中断(1)
问题现象 :master的CPU占用过高 或 slave频繁断开连接
问题原因 :slave每1秒发送REPLCONF ACK命令到master , 当slave接到了慢查询时(keys * ,hgetall等),会大量占用CPU性能 ,master每1秒调用复制定时函数replicationCron(),比对slave发现长时间没有进行响应
最终结果 :master各种资源(输出缓冲区、带宽、连接等)被严重占用
解决方案 :通过设置合理的超时时间,确认是否释放slave repl-timeout 该参数定义了超时时间的阈值(默认60秒),超过该值,释放slave
4)频繁的网络中断(2)
问题现象 : slave与master连接断开
问题原因 :master发送ping指令频度较低 ,master设定超时时间较短 , ping指令在网络中存在丢包
解决方案 : 提高ping指令发送的频度 repl-ping-slave-period超时时间repl-time的时间至少是ping指令频度的5到10倍,否则slave很容易判定超时
5)数据不一致
问题现象 :多个slave获取相同数据不同步
问题原因 : 网络信息不同步,数据发送有延迟
**解决方案 😗*优化主从间的网络环境,通常放置在同一个机房部署,如使用阿里云等云服务器时要注意此现象 ; 监控主从节点延迟(通过offset)判断,如果slave延迟过大,暂时屏蔽程序对该slave的数据访问 slave-serve-stale-data yes|no开启后仅响应info、slaveof等少数命令(慎用,除非对数据一致性要求很高)
3.17 哨兵
Redis提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复。哨兵也是一台redis服务器,只是不提供数据服务 通常哨兵配置数量为单数
哨兵(sentinel) 是一个分布式系统,用于对主从结构中的每台服务器进行监控,当出现故障时通过投票机制选择新的 master并将所有slave连接到新的master。
1.集群结构和作用
哨兵的结构如图:

哨兵的作用如下:
- 监控:Sentinel 会不断检查您的master和slave是否按预期工作
- 自动故障恢复:如果master故障,Sentinel会将一个slave提升为master。当故障实例恢复后也以新的master为主
- 通知:Sentinel充当Redis客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给Redis的客户端
2.集群监控原理
Sentinel基于心跳机制监测服务状态,每隔1秒向集群的每个实例发送ping命令:
•主观下线:如果某sentinel节点发现某实例未在规定时间响应,则认为该实例主观下线。
•客观下线:若超过指定数量(quorum)的sentinel都认为该实例主观下线,则该实例客观下线。quorum值最好超过Sentinel实例数量的一半。

3.集群故障恢复原理
一旦发现master故障,sentinel需要在salve中选择一个作为新的master,选择依据是这样的:
- 首先会判断slave节点与master节点断开时间长短,如果超过指定值(down-after-milliseconds * 10)则会排除该slave节点
- 然后判断slave节点的slave-priority值,越小优先级越高,如果是0则永不参与选举
- 如果slave-prority一样,则判断slave节点的offset值,越大说明数据越新,优先级越高
- 最后是判断slave节点的运行id大小,越小优先级越高。
当选出一个新的master后,该如何实现切换呢?
流程如下:
- sentinel给备选的slave1节点发送slaveof no one命令,让该节点成为master
- sentinel给所有其它slave发送slaveof 192.168.150.101 7002 命令,让这些slave成为新master的从节点,开始从新的master上同步数据。
- 最后,sentinel将故障节点标记为slave,当故障节点恢复后会自动成为新的master的slave节点

3.18 分片集群
主从和哨兵可以解决高可用、高并发读的问题。但是依然有两个问题没有解决:
- 海量数据存储问题
- 高并发写的问题
使用分片集群可以解决。
分片集群特征:
集群中有多个master,每个master保存不同数据
每个master都可以有多个slave节点
master之间通过ping监测彼此健康状态
客户端请求可以访问集群任意节点,最终都会被转发到正确节点
1. 原理
Redis会把每一个master节点映射到0~16383共16384个插槽(hash slot)上,查看集群信息时就能看到
数据key不是与节点绑定,而是与插槽绑定。redis会根据key的有效部分计算插槽值,分两种情况:
- key中包含"{}",且“{}”中至少包含1个字符,“{}”中的部分是有效部分
- key中不包含“{}”,整个key都是有效部分
例如:key是num,那么就根据num计算,如果是{itcast}num,则根据itcast计算。计算方式是利用CRC16算法得到一个hash值,然后对16384取余,得到的结果就是slot值。
2. 集群伸缩
# 添加新节点到redis
redis-cli --cluster add-node 192.168.150.101:7004 192.168.150.101:7001
# 查看集群状态 ,新加入的节点插槽数为0
redis-cli -p 7001 cluster nodes3. 转移插槽
# 查看num的插槽
get num
# 建立于redis的连接
redis-cli --cluster reshard ip:端口
# 输入转移数量
# 输入接收插槽的节点id
# 输入带转移插槽的id或all(全部节点各转移一部分)或done(没有)
# 输入yes确认3.19 故障转移
redis存在自动故障转移,提升slave为master。
也可使使用命令cluster failover手动使某个节点宕机,执行redis的自动故障转移。

failover命令可以指定三种模式:
- 缺省:默认的流程,如图1~6歩
- force:省略了对offset的一致性校验
- takeover:直接执行第5歩,忽略数据一致性、忽略master状态和其它master的意见
四、安装
4.1 常规安装
压缩包上传
安装C++编译环境 yum install -y gcc-c++
修改安装目录
# /src/Makefile
PREFIX?=/usr/local/redis编译安装 make install
修改redis.conf文件中的一些配置:
# 绑定地址,默认是127.0.0.1,会导致只能在本地访问。修改为0.0.0.0则可以在任意IP访问
bind 0.0.0.0
# 数据库数量,设置为1
databases 1启动
# 不加conf文件路径使用默认配置
/usr/local/redis/bin/redis-server redis.conf文件路径停止
/usr/local/redis/bin/redis-cli shutdown4.2 集群搭建
可以修改redis配置文件绑定ip
properties# redis实例的声明 IP replica-announce-ip 192.168.150.101配置
properties# 添加节点 cluster-enabled yes|no #cluster配置文件名,该文件属于自动生成,仅用于快速查找文件并查询文件内容 cluster-config-file <filename> # 节点服务响应超时时间,用于判定该节点是否下线或切换为从节点 cluster-node-timeout <milliseconds> #master连接的slave最小数量 cluster-migration-barrier <count>节点操作命令
sh# 查看集群节点信息 cluster nodes # 进入一个从节点 redis,切换其主节点 cluster replicate <master-id> # 发现一个新节点,新增主节点 cluster meet ip:port # 忽略一个没有solt的节点 cluster forget <id> # 手动故障转移 cluster failoverredis-trib命令
sh# 添加节点 redis-trib.rb add-node # 删除节点 redis-trib.rb del-node # 重新分片 redis-trib.rb reshard启动多个redis,使用命令创建主从关系。
shell# 查看主从关系 info replication # 设定主从关系 在从机上执行,设定主机,在主机上执行,设置从机 SLAVEOF 127.0.0.1 6000 # 取消主从关系 在从机上执 SLAVEOF NO ONE # 设置主从关系 # 从机启动时设置 redis-server -slaveof <masterip> <masterport> # 授权访问 # master客户端发送命令设置密码 requirepass <password> # master配置文件设置密码 config set requirepass <password> config get requirepass # slave客户端发送命令设置密码 auth <password> # slave配置文件设置密码 masterauth <password> # slave启动服务器设置密码 redis-server –a <password>哨兵模式
properties# vim /usr/local/cluster-redis/sentinel.conf # 端口 port 27001 sentinel monitor 为主机命名 主机IP 主机端口号 将主机判定为下线时需要Sentinel同意的数量 # 启动 /usr/local/redis/bin/redis-server /usr/local/cluster-redis/sentinel.conf --sentinel
4.3 分片集群
Redis5.0之前
Redis5.0之前集群命令都是用redis安装包下的src/redis-trib.rb来实现的。因为redis-trib.rb是有ruby语言编写的所以需要安装ruby环境。
# 安装依赖
yum -y install zlib ruby rubygems
gem install redis然后通过命令来管理集群:
# 进入redis的src目录
cd /tmp/redis-6.2.4/src
# 创建集群
./redis-trib.rb create --replicas 1 192.168.150.101:7001 192.168.150.101:7002 192.168.150.101:7003 192.168.150.101:8001 192.168.150.101:8002 192.168.150.101:80032)Redis5.0以后
我们使用的是Redis6.2.4版本,集群管理以及集成到了redis-cli中,格式如下:
redis-cli --cluster create --cluster-replicas 1 192.168.150.101:7001 192.168.150.101:7002 192.168.150.101:7003 192.168.150.101:8001 192.168.150.101:8002 192.168.150.101:8003命令说明:
redis-cli --cluster或者./redis-trib.rb:代表集群操作命令create:代表是创建集群--replicas 1或者--cluster-replicas 1:指定集群中每个master的副本个数为1,此时节点总数 ÷ (replicas + 1)得到的就是master的数量。因此节点列表中的前n个就是master,其它节点都是slave节点,随机分配到不同master
通过命令可以查看集群状态:
redis-cli -p 7001 cluster nodes4.4 docker安装
# 拉取镜像
docker pull redis
# 创建容器
docker run -di --name=myredis -p 6379:6379 redis4.5 linux添加服务
# vi /etc/systemd/system/redis.service
[Unit]
Description=redis-server
After=network.target
[Service]
Type=forking
ExecStart=/usr/local/bin/redis-server /usr/local/src/redis-6.2.6/redis.conf
PrivateTmp=true
[Install]
WantedBy=multi-user.target重载系统服务
systemctl daemon-reload操作redis
# 启动
systemctl start redis
# 停止
systemctl stop redis
# 重启
systemctl restart redis
# 查看状态
systemctl status redis
# 开机自启
systemctl enable redis五、关联程序
5.1 客户端
# options
#`-h 127.0.0.1`:指定要连接的redis节点的IP地址,默认是127.0.0.1
# `-p 6379`:指定要连接的redis节点的端口,默认是6379
# `-a 123321`:指定redis的访问密码 不指定可连接redis后通过 auth验证密码
# commonds Redis的操作命令 不指定时会进入`redis-cli`的交互控制台
redis-cli [options] [commonds]图形化客户端
GitHub上的大神编写了Redis的图形化桌面客户端,地址:https://github.com/uglide/RedisDesktopManager
不过该仓库提供的是RedisDesktopManager的源码,并未提供windows安装包。
这个仓库可以找到安装包:https://github.com/lework/RedisDesktopManager-Windows/releases
5.2 Twemproxy
Twemproxy是Twitter开发的一个redis代理proxy, Twemproxy通过引入一个代理层,可以将其后端的多台Redis或Memcached实例进行统一管理与分配,使应用程序只需要在Twemproxy上进行操作,而不用关心后面具体有多少个真实的Redis或Memcached存储.简单说,就是有了Twemproxy,客户端不直接访问Redis服务器,而是通过twemproxy 代理中间件间接访问
特性
1)支持失败节点自动删除
可以设置重新连接该节点的时间; 可以设置连接多少次之后删除该节点
2)支持设置HashTag
通过HashTag可以自己设定将两个key哈希到同一个实例上去
3)减少与redis的直接连接数
保持与redis的长连接; 减少了客户端直接与服务器连接的连接数量
4)自动分片到后端多个redis实例上
多种hash算法:md5、crc16、crc32 、crc32a、fnv1_64、fnv1a_64、fnv1_32、fnv1a_32、hsieh、murmur、jenkins
多种分片算法:ketama(一致性hash算法的一种实现)、modula、random
可以设置后端实例的权重
5)避免单点问题
可以平行部署多个代理层,通过HAProxy做负载均衡,将redis的读写分散到多个twemproxy上。
6)支持状态监控
可设置状态监控ip和端口,访问ip和端口可以得到一个json格式的状态信息串;可设置监控信息刷新间隔时间
7)使用 pipelining 处理请求和响应
连接复用,内存复用; 将多个连接请求,组成reids pipelining统一向redis请求
8)并不是支持所有redis命令
不支持redis的事务操作; 使用SIDFF, SDIFFSTORE, SINTER, SINTERSTORE, SMOVE, SUNION and SUNIONSTORE命令需要保证key都在同一个分片上。
搭建环境
twemproxy单点问题可以通过keepalived解决。redis需要做主从。
安装autoconf
shwget http://ftp.gnu.org/gnu/autoconf/autoconf-2.69.tar.gz tar -zxvf autoconf-2.69.tar.gz cd autoconf-2.69 ./configure && make && make install安装 automake
shwget http://ftp.gnu.org/gnu/automake/automake-1.15.tar.gz tar -zvxf automake-1.15.tar.gz cd automake-1.15 ./configure && make && make install安装libtool
shwget https://ftp.gnu.org/gnu/libtool/libtool-2.4.6.tar.gz tar -zvxf libtool-2.4.6.tar.gz cd libtool-2.4.6 ./configure && make && make install安装twemproxy
shwget https://github.com/twitter/twemproxy/archive/master.zip unzip master cd twemproxy-master aclocal autoreconf -f -i -Wall,no-obsolete mkdir /usr/local/twemproxy ./configure --prefix=/usr/local/twemproxy/ make && make install配置twemproxy
sh# 将twemproxy-master下的conf目录复制到/usr/local/twemproxy下 cp -r ./conf /usr/local/twemproxy/ cd /usr/local/twemproxy/修改conf目录下nutcracker.yml文件内容
ymlalpha: listen: 192.168.93.134:22121 hash: fnv1a_64 distribution: ketama auto_eject_hosts: true redis: true server_retry_timeout: 2000 server_failure_limit: 1 servers: - 192.168.93.133:6379:1 - 192.168.93.134:6379:1